From 905da506252c98b57cc0cfc82ee5a453c0e15e9b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 14 Aug 2013 09:30:53 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 155 ++-----
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java | 13
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java | 143 +++---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java | 13
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 536 ++++++++++++++-------------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 58 +-
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 151 +++----
opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java | 43 -
8 files changed, 527 insertions(+), 585 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index a040f00..2dbda07 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -27,35 +27,21 @@
*/
package org.opends.server.replication.server;
+import java.io.IOException;
+import java.util.*;
+import java.util.zip.DataFormatException;
+
+import org.opends.messages.Message;
+import org.opends.server.replication.common.*;
+import org.opends.server.replication.protocol.*;
+import org.opends.server.types.*;
+
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.StatusMachine.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.DataFormatException;
-
-import org.opends.messages.Message;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachine;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.*;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeBuilder;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.ResultCode;
-
/**
* This class defines a server handler, which handles all interaction with a
* peer server (RS or DS).
@@ -104,11 +90,11 @@
* @param newGenId The new generation id to take into account
* @throws IOException If IO error occurred.
*/
- public void changeStatusForResetGenId(long newGenId)
- throws IOException
+ public void changeStatusForResetGenId(long newGenId) throws IOException
{
- StatusMachineEvent event;
+ final int localRsServerId = replicationServer.getServerId();
+ StatusMachineEvent event;
if (newGenId == -1)
{
// The generation id is being made invalid, let's put the DS
@@ -127,9 +113,8 @@
if (debugEnabled())
{
TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- ". Closing connection to DS " + getServerId() +
+ "In RS " + localRsServerId +
+ ", closing connection to DS " + getServerId() +
" for baseDn " + getBaseDN() +
" to force reconnection as new local" +
" generationId and remote one match and DS is in bad gen id: " +
@@ -140,20 +125,19 @@
// would rewait the RSD lock that we already must have entering this
// method. This would lead to a reentrant lock which we do not want.
// So simply close the session, this will make the hang up appear
- // after the reader thread that took the RSD lock realeases it.
- if (session != null)
+ // after the reader thread that took the RSD lock releases it.
+ if (session != null
+ // V4 protocol introduced a StopMsg to properly close the
+ // connection between servers
+ && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- // V4 protocol introduces a StopMsg to properly close the
- // connection between servers
- if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ try
{
- try
- {
- session.publish(new StopMsg());
- } catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
+ session.publish(new StopMsg());
+ }
+ catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
}
}
@@ -165,12 +149,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- ". DS " + getServerId() + " for baseDn " + getBaseDN() +
- " has already generation id " + newGenId +
- " so no ChangeStatusMsg sent to him.");
+ TRACER.debugInfo("In RS " + localRsServerId + ". DS "
+ + getServerId() + " for baseDn " + getBaseDN()
+ + " has already generation id " + newGenId
+ + " so no ChangeStatusMsg sent to him.");
}
return;
}
@@ -182,14 +164,13 @@
}
}
- if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) &&
- (status == ServerStatus.FULL_UPDATE_STATUS))
+ if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT
+ && status == ServerStatus.FULL_UPDATE_STATUS)
{
// Prevent useless error message (full update status cannot lead to bad
// gen status)
Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
- Integer.toString(replicationServerDomain.
- getReplicationServer().getServerId()),
+ Integer.toString(localRsServerId),
getBaseDN(),
Integer.toString(serverId),
Long.toString(generationId),
@@ -214,11 +195,9 @@
if (debugEnabled())
{
- TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- " Sending change status for reset gen id to " + getServerId() +
- " for baseDn " + getBaseDN() + ":\n" + csMsg);
+ TRACER.debugInfo("In RS " + localRsServerId
+ + " Sending change status for reset gen id to " + getServerId()
+ + " for baseDn " + getBaseDN() + ":\n" + csMsg);
}
session.publish(csMsg);
@@ -257,11 +236,9 @@
if (debugEnabled())
{
- TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- " Sending change status from status analyzer to " + getServerId() +
- " for baseDn " + getBaseDN() + ":\n" + csMsg);
+ TRACER.debugInfo("In RS " + replicationServer.getServerId()
+ + " Sending change status from status analyzer to " + getServerId()
+ + " for baseDn " + getBaseDN() + ":\n" + csMsg);
}
session.publish(csMsg);
@@ -296,14 +273,13 @@
// Add the specific DS ones
attributes.add(Attributes.create("replica", serverURL));
attributes.add(Attributes.create("connected-to",
- this.replicationServerDomain.getReplicationServer()
- .getMonitorInstanceName()));
+ this.replicationServer.getMonitorInstanceName()));
MonitorData md = replicationServerDomain.getDomainMonitorData();
// Oldest missing update
- Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
- if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
+ long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
+ if (approxFirstMissingDate > 0)
{
Date date = new Date(approxFirstMissingDate);
attributes.add(Attributes.create(
@@ -314,14 +290,12 @@
}
// Missing changes
- long missingChanges = md.getMissingChanges(serverId);
- attributes.add(Attributes.create("missing-changes", String
- .valueOf(missingChanges)));
+ attributes.add(Attributes.create("missing-changes",
+ String.valueOf(md.getMissingChanges(serverId))));
// Replication delay
- long delay = md.getApproxDelay(serverId);
- attributes.add(Attributes.create("approximate-delay", String
- .valueOf(delay)));
+ attributes.add(Attributes.create("approximate-delay",
+ String.valueOf(md.getApproxDelay(serverId))));
/* get the Server State */
AttributeBuilder builder = new AttributeBuilder("server-state");
@@ -541,18 +515,9 @@
{
Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
Integer.toString(inServerStartMsg.getServerId()),
- Integer.toString(replicationServerDomain.getReplicationServer().
- getServerId()));
+ Integer.toString(replicationServer.getServerId()));
throw new DirectoryException(ResultCode.OTHER, errMessage);
}
- catch (NotSupportedOldVersionPDUException e)
- {
- // We do not need to support DS V1 connection, we just accept RS V1
- // connection:
- // We just trash the message, log the event for debug purpose and close
- // the connection
- throw new DirectoryException(ResultCode.OTHER, null, null);
- }
catch (Exception e)
{
// We do not need to support DS V1 connection, we just accept RS V1
@@ -588,7 +553,7 @@
}
finally
{
- if ((replicationServerDomain != null) &&
+ if (replicationServerDomain != null &&
replicationServerDomain.hasLock())
replicationServerDomain.release();
}
@@ -610,21 +575,20 @@
{
// Peer DS uses protocol < V4 : send it a ReplServerStartMsg
startMsg = new ReplServerStartMsg(getReplicationServerId(),
- getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+ getReplicationServerURL(), getBaseDN(), maxRcvWindow,
replicationServerDomain.getDbServerState(),
localGenerationId, sslEncryption, getLocalGroupId(),
- replicationServerDomain.getReplicationServer()
- .getDegradedStatusThreshold());
+ replicationServer.getDegradedStatusThreshold());
}
else
{
// Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
- getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+ getReplicationServerURL(), getBaseDN(), maxRcvWindow,
replicationServerDomain.getDbServerState(),
localGenerationId, sslEncryption, getLocalGroupId(),
- replicationServerDomain.getReplicationServer()
- .getDegradedStatusThreshold(), replicationServer.getWeight(),
+ replicationServer.getDegradedStatusThreshold(),
+ replicationServer.getWeight(),
replicationServerDomain.getConnectedLDAPservers().size());
}
@@ -651,17 +615,10 @@
{
if (serverId != 0)
{
- StringBuilder builder = new StringBuilder("Replica DS(");
- builder.append(serverId);
- builder.append(") for domain \"");
- builder.append(replicationServerDomain.getBaseDn());
- builder.append("\"");
- return builder.toString();
+ return "Replica DS(" + serverId + ") for domain \""
+ + replicationServerDomain.getBaseDn() + "\"";
}
- else
- {
- return "Unknown server";
- }
+ return "Unknown server";
}
/**
@@ -740,7 +697,7 @@
else
{
// We are an empty ReplicationServer
- if ((generationId > 0) && (!getServerState().isEmpty()))
+ if (generationId > 0 && !getServerState().isEmpty())
{
// If the LDAP server has already sent changes
// it is not expected to connect to an empty RS
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index eb5ed4e..eae646a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -365,20 +365,19 @@
if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// Peer DS uses protocol < V4 : send it a ReplServerStartMsg
- startMsg = new ReplServerStartMsg(getReplicationServerId(),
- getReplicationServerURL(), getBaseDN(), maxRcvWindow,
- replicationServerDomain.getDbServerState(),
- localGenerationId, sslEncryption, getLocalGroupId(),
- replicationServerDomain.getReplicationServer()
- .getDegradedStatusThreshold());
+ startMsg = new ReplServerStartMsg(getReplicationServerId(),
+ getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+ replicationServerDomain.getDbServerState(),
+ localGenerationId, sslEncryption, getLocalGroupId(),
+ replicationServer.getDegradedStatusThreshold());
}
else
{
// Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
- startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
- getReplicationServerURL(), getBaseDN(), maxRcvWindow,
- new ServerState(), localGenerationId, sslEncryption,
- getLocalGroupId(), 0, replicationServer.getWeight(), 0);
+ startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
+ getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+ new ServerState(), localGenerationId, sslEncryption,
+ getLocalGroupId(), 0, replicationServer.getWeight(), 0);
}
send(startMsg);
@@ -556,15 +555,13 @@
catch(DirectoryException de)
{
TRACER.debugCaught(DebugLogLevel.ERROR, de);
- if (draftCNDbIter != null)
- draftCNDbIter.releaseCursor();
+ releaseIterator();
throw de;
}
catch(Exception e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- if (draftCNDbIter != null)
- draftCNDbIter.releaseCursor();
+ releaseIterator();
throw new DirectoryException(
ResultCode.OPERATIONS_ERROR,
Message.raw(Category.SYNC,
@@ -917,11 +914,7 @@
{
if (debugEnabled())
TRACER.debugInfo(this + " shutdown()" + draftCNDbIter);
- if (this.draftCNDbIter != null)
- {
- draftCNDbIter.releaseCursor();
- draftCNDbIter = null;
- }
+ releaseIterator();
for (DomainContext domainCtxt : domainCtxts) {
if (!domainCtxt.unRegisterHandler()) {
logError(Message.raw(Category.SYNC, Severity.NOTICE,
@@ -934,6 +927,15 @@
domainCtxts = null;
}
+ private void releaseIterator()
+ {
+ if (this.draftCNDbIter != null)
+ {
+ this.draftCNDbIter.releaseCursor();
+ this.draftCNDbIter = null;
+ }
+ }
+
/**
* Request to shutdown the associated writer.
*/
@@ -1112,7 +1114,7 @@
{
session.publish(
new ErrorMsg(
- replicationServerDomain.getReplicationServer().getServerId(),
+ replicationServer.getServerId(),
serverId,
Message.raw(Category.SYNC, Severity.INFORMATION,
"Exception raised: " + e.getMessage())));
@@ -1130,11 +1132,9 @@
registerIntoDomain();
if (debugEnabled())
- TRACER.debugInfo(
- this.getClass().getCanonicalName()+ " " + operationId +
- " initialized: " +
- " " + dumpState() + " " +
- " " + clDomCtxtsToString(""));
+ TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId
+ + " initialized: " + " " + dumpState() + " " + " "
+ + clDomCtxtsToString(""));
}
private void initializeChangelogSearch(StartECLSessionMsg msg)
@@ -1522,12 +1522,8 @@
searchPhase = UNDEFINED_PHASE;
}
- if (draftCNDbIter!=null)
- {
- // End of INIT_PHASE => always release the iterator
- draftCNDbIter.releaseCursor();
- draftCNDbIter = null;
- }
+ // End of INIT_PHASE => always release the iterator
+ releaseIterator();
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
index 0157f44..9107b2d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -27,14 +27,7 @@
*/
package org.opends.server.replication.server;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
@@ -50,6 +43,8 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
/**
* This class defines a server handler dedicated to the remote LDAP servers
* connected to a remote Replication Server.
@@ -142,13 +137,15 @@
this.protocolVersion = protocolVersion;
if (debugEnabled())
- TRACER.debugInfo(
- "In " +
- replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName()+
- " LWSH for remote server " + this.serverId +
- " connected to:" + this.replServerHandler.getMonitorInstanceName() +
- " ()");
-}
+ TRACER.debugInfo("In " + getLocalRSMonitorInstanceName()
+ + " LWSH for remote server " + this.serverId + " connected to:"
+ + this.replServerHandler.getMonitorInstanceName() + " ()");
+ }
+
+ private String getLocalRSMonitorInstanceName()
+ {
+ return rsDomain.getReplicationServer().getMonitorInstanceName();
+ }
/**
* Creates a DSInfo structure representing this remote DS.
@@ -176,15 +173,11 @@
public void startHandler()
{
if (debugEnabled())
- TRACER.debugInfo(
- "In " +
-replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() +
- " LWSH for remote server " + this.serverId +
- " connected to:" + this.replServerHandler.getMonitorInstanceName() +
- " start");
+ TRACER.debugInfo("In " + getLocalRSMonitorInstanceName()
+ + " LWSH for remote server " + this.serverId + " connected to:"
+ + this.replServerHandler.getMonitorInstanceName() + " start");
DirectoryServer.deregisterMonitorProvider(this);
DirectoryServer.registerMonitorProvider(this);
-
}
/**
@@ -193,10 +186,8 @@
public void stopHandler()
{
if (debugEnabled())
- TRACER.debugInfo("In "
- + replServerHandler.getDomain().getReplicationServer()
- .getMonitorInstanceName() + " LWSH for remote server "
- + this.serverId + " connected to:"
+ TRACER.debugInfo("In " + getLocalRSMonitorInstanceName()
+ + " LWSH for remote server " + this.serverId + " connected to:"
+ this.replServerHandler.getMonitorInstanceName() + " stop");
DirectoryServer.deregisterMonitorProvider(this);
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index ed652c7..f55298c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -280,32 +280,13 @@
* unlock memory tree
* restart as usual
* load this change on the delayList
- *
*/
- SortedSet<ReplicationIterator> iteratorSortedSet =
- new TreeSet<ReplicationIterator>(
- new ReplicationIteratorComparator());
+ SortedSet<ReplicationIterator> iteratorSortedSet = null;
try
{
- /* fill the lateQueue */
- for (int serverId : replicationServerDomain.getServers())
- {
- ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
- ReplicationIterator iterator = replicationServerDomain
- .getChangelogIterator(serverId, lastCsn);
- if (iterator != null)
- {
- if (iterator.getChange() != null)
- {
- iteratorSortedSet.add(iterator);
- }
- else
- {
- iterator.releaseCursor();
- }
- }
- }
+ iteratorSortedSet = collectAllIteratorsWithChanges();
+ /* fill the lateQueue */
// The loop below relies on the fact that it is sorted based
// on the currentChange of each iterator to consider the next
// change across all servers.
@@ -320,22 +301,12 @@
ReplicationIterator iterator = iteratorSortedSet.first();
iteratorSortedSet.remove(iterator);
lateQueue.add(iterator.getChange());
- if (iterator.next())
- {
- iteratorSortedSet.add(iterator);
- }
- else
- {
- iterator.releaseCursor();
- }
+ addIteratorIfNotEmpty(iteratorSortedSet, iterator);
}
}
finally
{
- for (ReplicationIterator iterator : iteratorSortedSet)
- {
- iterator.releaseCursor();
- }
+ releaseAllIterators(iteratorSortedSet);
}
/*
@@ -343,7 +314,6 @@
* messages in the replication log so the remote serevr is not
* late anymore.
*/
-
if (lateQueue.isEmpty())
{
synchronized (msgQueue)
@@ -430,6 +400,19 @@
return null;
}
+ private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators,
+ ReplicationIterator iter)
+ {
+ if (iter.next())
+ {
+ iterators.add(iter);
+ }
+ else
+ {
+ iter.releaseCursor();
+ }
+ }
+
/**
* Get the older Change Number for that server.
* Returns null when the queue is empty.
@@ -450,7 +433,12 @@
}
else
{
- if (lateQueue.isEmpty())
+ if (!lateQueue.isEmpty())
+ {
+ UpdateMsg msg = lateQueue.first();
+ result = msg.getChangeNumber();
+ }
+ else
{
/*
following is false AND lateQueue is empty
@@ -460,36 +448,10 @@
there. So let's take the last change not sent directly from
the db.
*/
- SortedSet<ReplicationIterator> iteratorSortedSet =
- new TreeSet<ReplicationIterator>(
- new ReplicationIteratorComparator());
+ SortedSet<ReplicationIterator> iteratorSortedSet = null;
try
{
- // Build a list of candidates iterator (i.e. db i.e. server)
- for (int serverId : replicationServerDomain.getServers())
- {
- // get the last already sent CN from that server
- ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
- // get an iterator in this server db from that last change
- ReplicationIterator iterator =
- replicationServerDomain.getChangelogIterator(serverId, lastCsn);
- /*
- if that iterator has changes, then it is a candidate
- it is added in the sorted list at a position given by its
- current change (see ReplicationIteratorComparator).
- */
- if (iterator != null)
- {
- if (iterator.getChange() != null)
- {
- iteratorSortedSet.add(iterator);
- }
- else
- {
- iterator.releaseCursor();
- }
- }
- }
+ iteratorSortedSet = collectAllIteratorsWithChanges();
UpdateMsg msg = iteratorSortedSet.first().getChange();
result = msg.getChangeNumber();
} catch (Exception e)
@@ -497,21 +459,58 @@
result = null;
} finally
{
- for (ReplicationIterator iterator : iteratorSortedSet)
- {
- iterator.releaseCursor();
- }
+ releaseAllIterators(iteratorSortedSet);
}
- } else
- {
- UpdateMsg msg = lateQueue.first();
- result = msg.getChangeNumber();
}
}
}
return result;
}
+ private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges()
+ {
+ SortedSet<ReplicationIterator> results =
+ new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator());
+
+ // Build a list of candidates iterator (i.e. db i.e. server)
+ for (int serverId : replicationServerDomain.getServers())
+ {
+ // get the last already sent CN from that server
+ ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
+ // get an iterator in this server db from that last change
+ ReplicationIterator iter =
+ replicationServerDomain.getChangelogIterator(serverId, lastCsn);
+ /*
+ if that iterator has changes, then it is a candidate
+ it is added in the sorted list at a position given by its
+ current change (see ReplicationIteratorComparator).
+ */
+ if (iter != null)
+ {
+ if (iter.getChange() != null)
+ {
+ results.add(iter);
+ }
+ else
+ {
+ iter.releaseCursor();
+ }
+ }
+ }
+ return results;
+ }
+
+ private void releaseAllIterators(SortedSet<ReplicationIterator> iterators)
+ {
+ if (iterators != null)
+ {
+ for (ReplicationIterator iter : iterators)
+ {
+ iter.releaseCursor();
+ }
+ }
+ }
+
/**
* Get the count of updates sent to this server.
* @return The count of update sent to this server.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index e335175..e004623 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -118,7 +118,8 @@
*/
private final Map<Integer, DbHandler> sourceDbHandlers =
new ConcurrentHashMap<Integer, DbHandler>();
- private ReplicationServer replicationServer;
+ /** The ReplicationServer that created the current instance. */
+ private ReplicationServer localReplicationServer;
/** GenerationId management. */
private volatile long generationId = -1;
@@ -217,16 +218,16 @@
* Creates a new ReplicationServerDomain associated to the DN baseDn.
*
* @param baseDn The baseDn associated to the ReplicationServerDomain.
- * @param replicationServer the ReplicationServer that created this
+ * @param localReplicationServer the ReplicationServer that created this
* replicationServer cache.
*/
- public ReplicationServerDomain(
- String baseDn, ReplicationServer replicationServer)
+ public ReplicationServerDomain(String baseDn,
+ ReplicationServer localReplicationServer)
{
this.baseDn = baseDn;
- this.replicationServer = replicationServer;
+ this.localReplicationServer = localReplicationServer;
this.assuredTimeoutTimer = new Timer("Replication server RS("
- + replicationServer.getServerId()
+ + localReplicationServer.getServerId()
+ ") assured timer for domain \"" + baseDn + "\"", true);
DirectoryServer.registerMonitorProvider(this);
@@ -245,9 +246,9 @@
public void put(UpdateMsg update, ServerHandler sourceHandler)
throws IOException
{
-
ChangeNumber cn = update.getChangeNumber();
- int id = cn.getServerId();
+ int serverId = cn.getServerId();
+
sourceHandler.updateServerState(update);
sourceHandler.incrementInCount();
@@ -297,7 +298,7 @@
{
// Unknown assured mode: should never happen
Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get(
- Integer.toString(replicationServer.getServerId()),
+ Integer.toString(localReplicationServer.getServerId()),
assuredMode.toString(), baseDn, update.toString());
logError(errorMsg);
assuredMessage = false;
@@ -308,40 +309,11 @@
}
}
- // look for the dbHandler that is responsible for the LDAP server which
- // generated the change.
- DbHandler dbHandler;
- synchronized (sourceDbHandlers)
+ if (!publishMessage(update, serverId))
{
- dbHandler = sourceDbHandlers.get(id);
- if (dbHandler == null)
- {
- try
- {
- dbHandler = replicationServer.newDbHandler(id, baseDn);
- generationIdSavedStatus = true;
- } catch (ChangelogException e)
- {
- /*
- * Because of database problem we can't save any more changes
- * from at least one LDAP server.
- * This replicationServer therefore can't do it's job properly anymore
- * and needs to close all its connections and shutdown itself.
- */
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
- return;
- }
- sourceDbHandlers.put(id, dbHandler);
- }
+ return;
}
- // Publish the messages to the source handler
- dbHandler.add(update);
-
List<Integer> expectedServers = null;
if (assuredMessage)
{
@@ -363,7 +335,7 @@
// times out)
AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn);
assuredTimeoutTimer.schedule(assuredTimeoutTask,
- replicationServer.getAssuredTimeout());
+ localReplicationServer.getAssuredTimeout());
// Purge timer every 100 treated messages
assuredTimeoutTimerPurgeCounter++;
if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
@@ -408,8 +380,9 @@
if (debugEnabled())
{
TRACER.debugInfo("In Replication Server "
- + replicationServer.getReplicationPort() + " " + baseDn + " "
- + replicationServer.getServerId() + " for dn " + baseDn
+ + localReplicationServer.getReplicationPort() + " " + baseDn
+ + " "
+ + localReplicationServer.getServerId() + " for dn " + baseDn
+ ", update " + update.getChangeNumber()
+ " will not be sent to replication server "
+ handler.getServerId() + " with generation id "
@@ -464,7 +437,7 @@
}
if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
{
- TRACER.debugInfo("In RS " + replicationServer.getServerId()
+ TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
+ " for dn " + baseDn + ", update " + update.getChangeNumber()
+ " will not be sent to directory server "
+ handler.getServerId() + " as it is in full update");
@@ -484,6 +457,44 @@
}
}
+ private boolean publishMessage(UpdateMsg update, int serverId)
+ {
+ // look for the dbHandler that is responsible for the LDAP server which
+ // generated the change.
+ DbHandler dbHandler;
+ synchronized (sourceDbHandlers)
+ {
+ dbHandler = sourceDbHandlers.get(serverId);
+ if (dbHandler == null)
+ {
+ try
+ {
+ dbHandler = localReplicationServer.newDbHandler(serverId, baseDn);
+ generationIdSavedStatus = true;
+ } catch (ChangelogException e)
+ {
+ /*
+ * Because of database problem we can't save any more changes
+ * from at least one LDAP server.
+ * This replicationServer therefore can't do it's job properly anymore
+ * and needs to close all its connections and shutdown itself.
+ */
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ localReplicationServer.shutdown();
+ return false;
+ }
+ sourceDbHandlers.put(serverId, dbHandler);
+ }
+ }
+
+ // Publish the messages to the source handler
+ dbHandler.add(update);
+ return true;
+ }
+
private NotAssuredUpdateMsg addUpdate(ServerHandler handler,
UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate,
boolean assuredMessage, List<Integer> expectedServers)
@@ -557,7 +568,7 @@
UpdateMsg update, ServerHandler sourceHandler) throws IOException
{
ChangeNumber cn = update.getChangeNumber();
- byte groupId = replicationServer.getGroupId();
+ byte groupId = localReplicationServer.getGroupId();
byte sourceGroupId = sourceHandler.getGroupId();
List<Integer> expectedServers = new ArrayList<Integer>();
List<Integer> wrongStatusServers = new ArrayList<Integer>();
@@ -642,13 +653,13 @@
ChangeNumber cn = update.getChangeNumber();
boolean interestedInAcks = false;
byte safeDataLevel = update.getSafeDataLevel();
- byte groupId = replicationServer.getGroupId();
+ byte groupId = localReplicationServer.getGroupId();
byte sourceGroupId = sourceHandler.getGroupId();
if (safeDataLevel < (byte) 1)
{
// Should never happen
Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get(
- Integer.toString(replicationServer.getServerId()),
+ Integer.toString(localReplicationServer.getServerId()),
Byte.toString(safeDataLevel), baseDn, update.toString());
logError(errorMsg);
} else if (sourceGroupId == groupId
@@ -799,7 +810,7 @@
*/
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_RS_ERROR_SENDING_ACK.get(
- Integer.toString(replicationServer.getServerId()),
+ Integer.toString(localReplicationServer.getServerId()),
Integer.toString(origServer.getServerId()),
cn.toString(), baseDn));
mb.append(stackTraceToSingleLineString(e));
@@ -862,7 +873,7 @@
ServerHandler origServer = expectedAcksInfo.getRequesterServer();
if (debugEnabled())
{
- TRACER.debugInfo("In RS " + replicationServer.getServerId()
+ TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
+ " for "+ baseDn
+ ", sending timeout for assured update with change "
+ " number " + cn + " to server id "
@@ -879,7 +890,7 @@
*/
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_RS_ERROR_SENDING_ACK.get(
- Integer.toString(replicationServer.getServerId()),
+ Integer.toString(localReplicationServer.getServerId()),
Integer.toString(origServer.getServerId()),
cn.toString(), baseDn));
mb.append(stackTraceToSingleLineString(e));
@@ -987,7 +998,7 @@
{
// looks like two connected LDAP servers have the same serverId
Message message = ERR_DUPLICATE_SERVER_ID.get(
- replicationServer.getMonitorInstanceName(),
+ localReplicationServer.getMonitorInstanceName(),
directoryServers.get(handler.getServerId()).toString(),
handler.toString(), handler.getServerId());
logError(message);
@@ -1007,7 +1018,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ TRACER.debugInfo("In "
+ + this.localReplicationServer.getMonitorInstanceName()
+ " domain=" + this + " stopServer() on the server handler "
+ handler.getMonitorInstanceName());
}
@@ -1045,7 +1057,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
+ TRACER.debugInfo("In "
+ + localReplicationServer.getMonitorInstanceName()
+ " remote server " + handler.getMonitorInstanceName()
+ " is the last RS/DS to be stopped:"
+ " stopping monitoring publisher");
@@ -1078,7 +1091,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In "
- + replicationServer.getMonitorInstanceName()
+ + localReplicationServer.getMonitorInstanceName()
+ " remote server " + handler.getMonitorInstanceName()
+ " is the last DS to be stopped: stopping status analyzer");
}
@@ -1128,7 +1141,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ TRACER.debugInfo("In "
+ + this.localReplicationServer.getMonitorInstanceName()
+ " domain=" + this + " stopServer() on the message handler "
+ handler.getMonitorInstanceName());
}
@@ -1207,8 +1221,8 @@
if (debugEnabled())
{
TRACER.debugInfo("In RS "
- + this.replicationServer.getMonitorInstanceName() + " for " + baseDn
- + " " + " mayResetGenerationId generationIdSavedStatus="
+ + this.localReplicationServer.getMonitorInstanceName()
+ + " for " + baseDn + " mayResetGenerationId generationIdSavedStatus="
+ generationIdSavedStatus);
}
@@ -1225,7 +1239,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In RS "
- + this.replicationServer.getMonitorInstanceName() + " for "
+ + this.localReplicationServer.getMonitorInstanceName() + " for "
+ baseDn + " " + " mayResetGenerationId skip RS"
+ rsh.getMonitorInstanceName() + " that has different genId");
}
@@ -1236,7 +1250,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In RS "
- + this.replicationServer.getMonitorInstanceName()
+ + this.localReplicationServer.getMonitorInstanceName()
+ " for "+ baseDn + " mayResetGenerationId RS"
+ rsh.getMonitorInstanceName()
+ " has servers connected to it"
@@ -1252,7 +1266,7 @@
if (debugEnabled())
{
TRACER.debugInfo("In RS "
- + this.replicationServer.getMonitorInstanceName() + " for "
+ + this.localReplicationServer.getMonitorInstanceName() + " for "
+ baseDn + " "
+ " has servers connected to it - will not reset generationId");
}
@@ -1292,7 +1306,7 @@
// looks like two replication servers have the same serverId
// log an error message and drop this connection.
Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
- replicationServer.getMonitorInstanceName(), oldHandler.
+ localReplicationServer.getMonitorInstanceName(), oldHandler.
getServerAddressURL(), handler.getServerAddressURL(),
handler.getServerId());
throw new DirectoryException(ResultCode.OTHER, message);
@@ -1372,12 +1386,12 @@
* and locks used by the ReplicationIterator.
*
* @param serverId Identifier of the server for which the iterator is created.
- * @param changeNumber Starting point for the iterator.
+ * @param startAfterCN Starting point for the iterator.
* @return the created ReplicationIterator. Null when no DB is available
* for the provided server Id.
*/
public ReplicationIterator getChangelogIterator(int serverId,
- ChangeNumber changeNumber)
+ ChangeNumber startAfterCN)
{
DbHandler handler = sourceDbHandlers.get(serverId);
if (handler == null)
@@ -1388,7 +1402,7 @@
ReplicationIterator it;
try
{
- it = handler.generateIterator(changeNumber);
+ it = handler.generateIterator(startAfterCN);
}
catch (Exception e)
{
@@ -1535,14 +1549,15 @@
}
/**
- * Processes a message coming from one server in the topology
- * and potentially forwards it to one or all other servers.
+ * Processes a message coming from one server in the topology and potentially
+ * forwards it to one or all other servers.
*
- * @param msg The message received and to be processed.
- * @param senderHandler The server handler of the server that emitted
- * the message.
+ * @param msg
+ * The message received and to be processed.
+ * @param msgEmitter
+ * The server handler of the server that emitted the message.
*/
- public void process(RoutableMsg msg, ServerHandler senderHandler)
+ public void process(RoutableMsg msg, ServerHandler msgEmitter)
{
// Test the message for which a ReplicationServer is expected
// to be the destination
@@ -1551,158 +1566,176 @@
!(msg instanceof InitializeRcvAckMsg) &&
!(msg instanceof EntryMsg) &&
!(msg instanceof DoneMsg) &&
- (msg.getDestination() == this.replicationServer.getServerId()))
+ (msg.getDestination() == this.localReplicationServer.getServerId()))
{
if (msg instanceof ErrorMsg)
{
ErrorMsg errorMsg = (ErrorMsg) msg;
- logError(ERR_ERROR_MSG_RECEIVED.get(
- errorMsg.getDetails()));
+ logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
} else if (msg instanceof MonitorRequestMsg)
{
- // If the request comes from a Directory Server we need to
- // build the full list of all servers in the topology
- // and send back a MonitorMsg with the full list of all the servers
- // in the topology.
- if (senderHandler.isDataServer())
- {
- // Monitoring information requested by a DS
- MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
- msg.getDestination(), msg.getSenderID(), monitorData);
-
- if (monitorMsg != null)
- {
- try
- {
- senderHandler.send(monitorMsg);
- }
- catch (IOException e)
- {
- // the connection was closed.
- }
- }
- return;
- } else
- {
- // Monitoring information requested by a RS
- MonitorMsg monitorMsg =
- createLocalTopologyMonitorMsg(msg.getDestination(),
- msg.getSenderID());
-
- if (monitorMsg != null)
- {
- try
- {
- senderHandler.send(monitorMsg);
- } catch (Exception e)
- {
- // We log the error. The requestor will detect a timeout or
- // any other failure on the connection.
- logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
- Integer.toString(msg.getDestination())));
- }
- }
- }
+ replyWithMonitorMsg(msg, msgEmitter);
} else if (msg instanceof MonitorMsg)
{
MonitorMsg monitorMsg = (MonitorMsg) msg;
- receivesMonitorDataResponse(monitorMsg, senderHandler.getServerId());
+ receivesMonitorDataResponse(monitorMsg, msgEmitter.getServerId());
} else
{
- logError(NOTE_ERR_ROUTING_TO_SERVER.get(
- msg.getClass().getCanonicalName()));
-
- MessageBuilder mb1 = new MessageBuilder();
- mb1.append(
- NOTE_ERR_ROUTING_TO_SERVER.get(msg.getClass().getCanonicalName()));
- mb1.append("serverID:").append(msg.getDestination());
- ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb1.toMessage());
- try
- {
- senderHandler.send(errMsg);
- } catch (IOException ioe1)
- {
- // an error happened on the sender session trying to recover
- // from an error on the receiver session.
- // Not much more we can do at this point.
- }
+ replyWithUnroutableMsgType(msgEmitter, msg);
}
return;
}
- List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
-
- if (servers.isEmpty())
+ List<ServerHandler> servers = getDestinationServers(msg, msgEmitter);
+ if (!servers.isEmpty())
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
- this.baseDn, Integer.toString(msg.getDestination())));
- mb.append(" In Replication Server=").append(
- this.replicationServer.getMonitorInstanceName());
- mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
- mb.append(" Details:routing table is empty");
- ErrorMsg errMsg = new ErrorMsg(
- this.replicationServer.getServerId(),
- msg.getSenderID(),
- mb.toMessage());
- logError(mb.toMessage());
+ forwardMsgToAllServers(msg, servers, msgEmitter);
+ }
+ else
+ {
+ replyWithUnreachablePeerMsg(msgEmitter, msg);
+ }
+ }
+
+ private void replyWithMonitorMsg(RoutableMsg msg, ServerHandler msgEmitter)
+ {
+ /*
+ * If the request comes from a Directory Server we need to build the full
+ * list of all servers in the topology and send back a MonitorMsg with the
+ * full list of all the servers in the topology.
+ */
+ if (msgEmitter.isDataServer())
+ {
+ // Monitoring information requested by a DS
+ MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
+ msg.getDestination(), msg.getSenderID(), monitorData);
try
{
- senderHandler.send(errMsg);
- } catch (IOException ioe)
- {
- // TODO Handle error properly (sender timeout in addition)
- /*
- * An error happened trying to send an error msg to this server.
- * Log an error and close the connection to this server.
- */
- MessageBuilder mb2 = new MessageBuilder();
- mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
- mb2.append(stackTraceToSingleLineString(ioe));
- logError(mb2.toMessage());
- stopServer(senderHandler, false);
+ msgEmitter.send(monitorMsg);
}
- } else
+ catch (IOException e)
+ {
+ // the connection was closed.
+ }
+ }
+ else
{
- for (ServerHandler targetHandler : servers)
+ // Monitoring information requested by a RS
+ MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
+ msg.getDestination(), msg.getSenderID());
+
+ if (monitorMsg != null)
{
try
{
- targetHandler.send(msg);
- } catch (IOException ioe)
+ msgEmitter.send(monitorMsg);
+ }
+ catch (IOException e)
{
- /*
- * An error happened trying the send a routable message
- * to its destination server.
- * Send back an error to the originator of the message.
- */
- MessageBuilder mb1 = new MessageBuilder();
- mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
- this.baseDn, Integer.toString(msg.getDestination())));
- mb1.append(" unroutable message =" + msg.getClass().getSimpleName());
- mb1.append(" Details: " + ioe.getLocalizedMessage());
- ErrorMsg errMsg = new ErrorMsg(
- msg.getSenderID(), mb1.toMessage());
- logError(mb1.toMessage());
- try
- {
- senderHandler.send(errMsg);
- } catch (IOException ioe1)
- {
- // an error happened on the sender session trying to recover
- // from an error on the receiver session.
- // We don't have much solution left beside closing the sessions.
- stopServer(senderHandler, false);
- stopServer(targetHandler, false);
- }
- // TODO Handle error properly (sender timeout in addition)
+ // We log the error. The requestor will detect a timeout or
+ // any other failure on the connection.
+ logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(Integer.toString(msg
+ .getDestination())));
}
}
}
-
}
+ private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
+ RoutableMsg msg)
+ {
+ String msgClassname = msg.getClass().getCanonicalName();
+ logError(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname));
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname));
+ mb.append("serverID:").append(msg.getDestination());
+ ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb.toMessage());
+ try
+ {
+ msgEmitter.send(errMsg);
+ }
+ catch (IOException ignored)
+ {
+ // an error happened on the sender session trying to recover
+ // from an error on the receiver session.
+ // Not much more we can do at this point.
+ }
+ }
+
+ private void replyWithUnreachablePeerMsg(ServerHandler msgEmitter,
+ RoutableMsg msg)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
+ this.baseDn, Integer.toString(msg.getDestination())));
+ mb.append(" In Replication Server=").append(
+ this.localReplicationServer.getMonitorInstanceName());
+ mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
+ mb.append(" Details:routing table is empty");
+ final Message message = mb.toMessage();
+ logError(message);
+
+ ErrorMsg errMsg = new ErrorMsg(this.localReplicationServer.getServerId(),
+ msg.getSenderID(), message);
+ try
+ {
+ msgEmitter.send(errMsg);
+ }
+ catch (IOException ignored)
+ {
+ // TODO Handle error properly (sender timeout in addition)
+ /*
+ * An error happened trying to send an error msg to this server.
+ * Log an error and close the connection to this server.
+ */
+ MessageBuilder mb2 = new MessageBuilder();
+ mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
+ mb2.append(stackTraceToSingleLineString(ignored));
+ logError(mb2.toMessage());
+ stopServer(msgEmitter, false);
+ }
+ }
+
+ private void forwardMsgToAllServers(RoutableMsg msg,
+ List<ServerHandler> servers, ServerHandler msgEmitter)
+ {
+ for (ServerHandler targetHandler : servers)
+ {
+ try
+ {
+ targetHandler.send(msg);
+ } catch (IOException ioe)
+ {
+ /*
+ * An error happened trying to send a routable message to its
+ * destination server.
+ * Send back an error to the originator of the message.
+ */
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
+ this.baseDn, Integer.toString(msg.getDestination())));
+ mb.append(" unroutable message =" + msg.getClass().getSimpleName());
+ mb.append(" Details: " + ioe.getLocalizedMessage());
+ final Message message = mb.toMessage();
+ logError(message);
+
+ ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
+ try
+ {
+ msgEmitter.send(errMsg);
+ } catch (IOException ioe1)
+ {
+ // an error happened on the sender session trying to recover
+ // from an error on the receiver session.
+ // We don't have much solution left beside closing the sessions.
+ stopServer(msgEmitter, false);
+ stopServer(targetHandler, false);
+ }
+ // TODO Handle error properly (sender timeout in addition)
+ }
+ }
+ }
/**
* Creates a new monitor message including monitoring information for the
@@ -1720,13 +1753,11 @@
public MonitorMsg createGlobalTopologyMonitorMsg(
int sender, int destination, MonitorData monitorData)
{
- MonitorMsg returnMsg =
- new MonitorMsg(sender, destination);
+ final MonitorMsg returnMsg = new MonitorMsg(sender, destination);
returnMsg.setReplServerDbState(getDbServerState());
- // Add the informations about the Replicas currently in
- // the topology.
+ // Add the informations about the Replicas currently in the topology.
Iterator<Integer> it = monitorData.ldapIterator();
while (it.hasNext())
{
@@ -1736,8 +1767,7 @@
monitorData.getApproxFirstMissingDate(replicaId), true);
}
- // Add the information about the Replication Servers
- // currently in the topology.
+ // Add the information about the RSs currently in the topology.
it = monitorData.rsIterator();
while (it.hasNext())
{
@@ -1787,16 +1817,14 @@
for (DataServerHandler lsh : this.directoryServers.values())
{
monitorMsg.setServerState(lsh.getServerId(),
- lsh.getServerState(), lsh.getApproxFirstMissingDate(),
- true);
+ lsh.getServerState(), lsh.getApproxFirstMissingDate(), true);
}
// Same for the connected RS
for (ReplicationServerHandler rsh : this.replicationServers.values())
{
monitorMsg.setServerState(rsh.getServerId(),
- rsh.getServerState(), rsh.getApproxFirstMissingDate(),
- false);
+ rsh.getServerState(), rsh.getApproxFirstMissingDate(), false);
}
// Populate the RS state in the msg from the DbState
@@ -1821,15 +1849,12 @@
stopAllServers(true);
- stopDbHandlers();
+ shutdownDbHandlers();
}
- /**
- * Stop the dbHandlers .
- */
- private void stopDbHandlers()
+ /** Shutdown all the dbHandlers. */
+ private void shutdownDbHandlers()
{
- // Shutdown the dbHandlers
synchronized (sourceDbHandlers)
{
for (DbHandler dbHandler : sourceDbHandlers.values())
@@ -1964,9 +1989,7 @@
// Create info for the local RS
List<RSInfo> rsInfos = new ArrayList<RSInfo>();
- RSInfo localRSInfo = toRSInfo(replicationServer, generationId);
- rsInfos.add(localRSInfo);
-
+ rsInfos.add(toRSInfo(localReplicationServer, generationId));
return new TopologyMsg(dsInfos, rsInfos);
}
@@ -1982,10 +2005,8 @@
*/
public TopologyMsg createTopologyMsgForDS(int destDsId)
{
- List<DSInfo> dsInfos = new ArrayList<DSInfo>();
- List<RSInfo> rsInfos = new ArrayList<RSInfo>();
-
// Go through every DSs (except recipient of msg)
+ List<DSInfo> dsInfos = new ArrayList<DSInfo>();
for (DataServerHandler serverHandler : directoryServers.values())
{
if (serverHandler.getServerId() == destDsId)
@@ -1995,15 +2016,15 @@
dsInfos.add(serverHandler.toDSInfo());
}
+
+ List<RSInfo> rsInfos = new ArrayList<RSInfo>();
// Add our own info (local RS)
- RSInfo localRSInfo = toRSInfo(replicationServer, generationId);
- rsInfos.add(localRSInfo);
+ rsInfos.add(toRSInfo(localReplicationServer, generationId));
// Go through every peer RSs (and get their connected DSs), also add info
// for RSs
for (ReplicationServerHandler serverHandler : replicationServers.values())
{
- // Put RS info
rsInfos.add(serverHandler.toRSInfo());
serverHandler.addDSInfos(dsInfos);
@@ -2354,11 +2375,11 @@
logError(mb.toMessage());
}
}
- stopDbHandlers();
+ shutdownDbHandlers();
}
try
{
- replicationServer.clearGenerationId(baseDn);
+ localReplicationServer.clearGenerationId(baseDn);
} catch (Exception e)
{
// TODO: i18n
@@ -2381,7 +2402,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ TRACER.debugInfo("In "
+ + this.localReplicationServer.getMonitorInstanceName()
+ " baseDN=" + baseDn + " isDegraded serverId=" + serverId
+ " given local generation Id=" + this.generationId);
}
@@ -2398,7 +2420,8 @@
if (debugEnabled())
{
- TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+ TRACER.debugInfo("In "
+ + this.localReplicationServer.getMonitorInstanceName()
+ " baseDN=" + baseDn + " Compute degradation of serverId="
+ serverId + " LS server generation Id=" + handler.getGenerationId());
}
@@ -2411,7 +2434,7 @@
*/
public ReplicationServer getReplicationServer()
{
- return replicationServer;
+ return localReplicationServer;
}
/**
@@ -2557,7 +2580,7 @@
int serverId = rs.getServerId();
MonitorRequestMsg msg = new MonitorRequestMsg(
- this.replicationServer.getServerId(), serverId);
+ this.localReplicationServer.getServerId(), serverId);
try
{
rs.send(msg);
@@ -2684,7 +2707,7 @@
// - from our own local db state
// - whatever they are directly or indirectly connected
ServerState dbServerState = getDbServerState();
- pendingMonitorData.setRSState(replicationServer.getServerId(),
+ pendingMonitorData.setRSState(localReplicationServer.getServerId(),
dbServerState);
for (int serverId : dbServerState) {
ChangeNumber storedCN = dbServerState.getChangeNumber(serverId);
@@ -2744,7 +2767,7 @@
while (rsidIterator.hasNext())
{
int rsid = rsidIterator.next();
- if (rsid == replicationServer.getServerId())
+ if (rsid == localReplicationServer.getServerId())
{
// this is the latency of the remote RSi regarding the current RS
// let's update the fmd of my connected LS
@@ -2895,7 +2918,7 @@
if (statusAnalyzer == null)
{
int degradedStatusThreshold =
- replicationServer.getDegradedStatusThreshold();
+ localReplicationServer.getDegradedStatusThreshold();
if (degradedStatusThreshold > 0) // 0 means no status analyzer
{
statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold);
@@ -2946,7 +2969,7 @@
if (monitoringPublisher == null)
{
long period =
- replicationServer.getMonitoringPublisherPeriod();
+ localReplicationServer.getMonitoringPublisherPeriod();
if (period > 0) // 0 means no monitoring publisher
{
monitoringPublisher = new MonitoringPublisher(this, period);
@@ -3004,8 +3027,8 @@
@Override
public String getMonitorInstanceName()
{
- return "Replication server RS(" + replicationServer.getServerId() + ") "
- + replicationServer.getServerURL() + ",cn="
+ return "Replication server RS(" + localReplicationServer.getServerId()
+ + ") " + localReplicationServer.getServerURL() + ",cn="
+ baseDn.replace(',', '_').replace('=', '_') + ",cn=Replication";
}
@@ -3018,9 +3041,9 @@
// publish the server id and the port number.
List<Attribute> attributes = new ArrayList<Attribute>();
attributes.add(Attributes.create("replication-server-id",
- String.valueOf(replicationServer.getServerId())));
+ String.valueOf(localReplicationServer.getServerId())));
attributes.add(Attributes.create("replication-server-port",
- String.valueOf(replicationServer.getReplicationPort())));
+ String.valueOf(localReplicationServer.getReplicationPort())));
// Add all the base DNs that are known by this replication server.
attributes.add(Attributes.create("domain-name", baseDn));
@@ -3032,7 +3055,7 @@
MonitorData md = getDomainMonitorData();
// Missing changes
- long missingChanges = md.getMissingChangesRS(replicationServer
+ long missingChanges = md.getMissingChangesRS(localReplicationServer
.getServerId());
attributes.add(Attributes.create("missing-changes",
String.valueOf(missingChanges)));
@@ -3201,30 +3224,13 @@
}
*/
- boolean serverIdConnected = false;
- if (directoryServers.containsKey(serverId))
- {
- serverIdConnected = true;
- }
- else
- {
- // not directly connected
- for (ReplicationServerHandler rsh : replicationServers.values())
- {
- if (rsh.isRemoteLDAPServer(serverId))
- {
- serverIdConnected = true;
- break;
- }
- }
- }
- if (!serverIdConnected)
+ if (!isServerConnected(serverId))
{
if (debugEnabled())
{
TRACER.debugInfo("In " + "Replication Server "
- + replicationServer.getReplicationPort() + " " + baseDn + " "
- + replicationServer.getServerId() + " Server " + serverId
+ + localReplicationServer.getReplicationPort() + " " + baseDn + " "
+ + localReplicationServer.getServerId() + " Server " + serverId
+ " is not considered for eligibility ... potentially down");
}
continue;
@@ -3246,13 +3252,31 @@
if (debugEnabled())
{
TRACER.debugInfo("In Replication Server "
- + replicationServer.getReplicationPort() + " " + baseDn + " "
- + replicationServer.getServerId()
+ + localReplicationServer.getReplicationPort() + " " + baseDn + " "
+ + localReplicationServer.getServerId()
+ " getEligibleCN() returns result =" + eligibleCN);
}
return eligibleCN;
}
+ private boolean isServerConnected(int serverId)
+ {
+ if (directoryServers.containsKey(serverId))
+ {
+ return true;
+ }
+
+ // not directly connected
+ for (ReplicationServerHandler rsHandler : replicationServers.values())
+ {
+ if (rsHandler.isRemoteLDAPServer(serverId))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp)
@@ -3299,8 +3323,8 @@
TRACER.debugCaught(DebugLogLevel.ERROR, e);
logError(ERR_CHANGELOG_ERROR_SENDING_MSG
.get("Replication Server "
- + replicationServer.getReplicationPort() + " "
- + baseDn + " " + replicationServer.getServerId()));
+ + localReplicationServer.getReplicationPort() + " "
+ + baseDn + " " + localReplicationServer.getServerId()));
stopServer(rsHandler, false);
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index afe7fb6..6f96bda 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -27,11 +27,6 @@
*/
package org.opends.server.replication.server;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.protocol.ProtocolVersion.*;
-
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -46,6 +41,11 @@
import org.opends.server.replication.protocol.*;
import org.opends.server.types.*;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
+
/**
* This class defines a server handler, which handles all interaction with a
* peer replication server.
@@ -82,10 +82,8 @@
generationId = inReplServerStartMsg.getGenerationId();
serverId = inReplServerStartMsg.getServerId();
serverURL = inReplServerStartMsg.getServerURL();
- int separator = serverURL.lastIndexOf(':');
- serverAddressURL =
- session.getRemoteAddress() + ":" + serverURL.substring(separator +
- 1);
+ final String port = serverURL.substring(serverURL.lastIndexOf(':') + 1);
+ serverAddressURL = session.getRemoteAddress() + ":" + port;
setBaseDNAndDomain(inReplServerStartMsg.getBaseDn(), false);
setInitialServerState(inReplServerStartMsg.getServerState());
setSendWindowSize(inReplServerStartMsg.getWindowSize());
@@ -119,8 +117,7 @@
getReplicationServerId(), getReplicationServerURL(), getBaseDN(),
maxRcvWindow, replicationServerDomain.getDbServerState(),
localGenerationId, sslEncryption,
- getLocalGroupId(), replicationServerDomain.getReplicationServer()
- .getDegradedStatusThreshold());
+ getLocalGroupId(), replicationServer.getDegradedStatusThreshold());
send(outReplServerStartMsg);
return outReplServerStartMsg;
}
@@ -296,7 +293,7 @@
finally
{
// Release domain
- if ((replicationServerDomain != null) &&
+ if (replicationServerDomain != null &&
replicationServerDomain.hasLock())
replicationServerDomain.release();
}
@@ -374,11 +371,9 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() +
- this + " RS V1 with serverID=" + serverId +
- " is connected with the right generation ID");
+ TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
+ + " " + this + " RS V1 with serverID=" + serverId
+ + " is connected with the right generation ID");
}
} else
{
@@ -420,10 +415,9 @@
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- Message errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(Integer
- .toString(inReplServerStartMsg.getServerId()), Integer
- .toString(replicationServerDomain.getReplicationServer()
- .getServerId()));
+ Message errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(
+ Integer.toString(inReplServerStartMsg.getServerId()),
+ Integer.toString(replicationServer.getServerId()));
abortStart(errMessage);
}
catch (DirectoryException e)
@@ -444,7 +438,7 @@
}
finally
{
- if ((replicationServerDomain != null) &&
+ if (replicationServerDomain != null &&
replicationServerDomain.hasLock())
replicationServerDomain.release();
}
@@ -489,12 +483,10 @@
// connection attempt.
return null;
}
- else
- {
- Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg
- .getClass().getCanonicalName(), "TopologyMsg");
- throw new DirectoryException(ResultCode.OTHER, message);
- }
+
+ Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(
+ msg.getClass().getCanonicalName(), "TopologyMsg");
+ throw new DirectoryException(ResultCode.OTHER, message);
}
// Remote RS sent his topo msg
@@ -518,10 +510,9 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " +
- replicationServerDomain.getReplicationServer().
- getMonitorInstanceName() + " RS with serverID=" + serverId +
- " is connected with the right generation ID, same as local ="
+ TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
+ + " RS with serverID=" + serverId
+ + " is connected with the right generation ID, same as local ="
+ generationId);
}
}
@@ -541,42 +532,40 @@
{
if (localGenerationId > 0)
{ // the local RS is initialized
- if (generationId > 0)
- { // the remote RS is initialized.
- // If not, there's nothing to do anyway.
- if (generationId != localGenerationId)
- {
- /* Either:
- *
- * 1) The 2 RS have different generationID
- * replicationServerDomain.getGenerationIdSavedStatus() == true
- *
- * if the present RS has received changes regarding its
- * gen ID and so won't change without a reset
- * then we are just degrading the peer.
- *
- * 2) This RS has never received any changes for the current
- * generation ID.
- *
- * Example case:
- * - we are in RS1
- * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
- * - RS1 has genId1 from LS1 /genId1 comes from data in suffix
- * - we are in RS1 and we receive a START msg from RS2
- * - Each RS keeps its genID / is degraded and when LS2
- * will be populated from LS1 everything will become ok.
- *
- * Issue:
- * FIXME : Would it be a good idea in some cases to just set the
- * gen ID received from the peer RS specially if the peer has a
- * non null state and we have a null state ?
- * replicationServerDomain.setGenerationId(generationId, false);
- */
- Message message = WARN_BAD_GENERATION_ID_FROM_RS.get(
- serverId, session.getReadableRemoteAddress(), generationId,
- getBaseDN(), getReplicationServerId(), localGenerationId);
- logError(message);
- }
+ if (generationId > 0
+ // the remote RS is initialized. If not, there's nothing to do anyway.
+ && generationId != localGenerationId)
+ {
+ /* Either:
+ *
+ * 1) The 2 RS have different generationID
+ * replicationServerDomain.getGenerationIdSavedStatus() == true
+ *
+ * if the present RS has received changes regarding its
+ * gen ID and so won't change without a reset
+ * then we are just degrading the peer.
+ *
+ * 2) This RS has never received any changes for the current
+ * generation ID.
+ *
+ * Example case:
+ * - we are in RS1
+ * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
+ * - RS1 has genId1 from LS1 /genId1 comes from data in suffix
+ * - we are in RS1 and we receive a START msg from RS2
+ * - Each RS keeps its genID / is degraded and when LS2
+ * will be populated from LS1 everything will become ok.
+ *
+ * Issue:
+ * FIXME : Would it be a good idea in some cases to just set the
+ * gen ID received from the peer RS specially if the peer has a
+ * non null state and we have a null state ?
+ * replicationServerDomain.setGenerationId(generationId, false);
+ */
+ Message message = WARN_BAD_GENERATION_ID_FROM_RS.get(
+ serverId, session.getReadableRemoteAddress(), generationId,
+ getBaseDN(), getReplicationServerId(), localGenerationId);
+ logError(message);
}
}
else
@@ -655,9 +644,7 @@
groupId = rsInfo.getGroupId();
weight = rsInfo.getWeight();
- /**
- * Store info for DSs connected to the peer RS
- */
+ // Store info for DSs connected to the peer RS
List<DSInfo> dsInfos = topoMsg.getDsList();
synchronized (remoteDirectoryServers)
@@ -688,18 +675,18 @@
* When this handler is connected to a replication server, specifies if
* a wanted server is connected to this replication server.
*
- * @param wantedServer The server we want to know if it is connected
+ * @param serverId The server we want to know if it is connected
* to the replication server represented by this handler.
* @return boolean True is the wanted server is connected to the server
* represented by this handler.
*/
- public boolean isRemoteLDAPServer(int wantedServer)
+ public boolean isRemoteLDAPServer(int serverId)
{
synchronized (remoteDirectoryServers)
{
for (LightweightServerHandler server : remoteDirectoryServers.values())
{
- if (wantedServer == server.getServerId())
+ if (serverId == server.getServerId())
{
return true;
}
@@ -765,9 +752,8 @@
MonitorData md = replicationServerDomain.getDomainMonitorData();
// Missing changes
- long missingChanges = md.getMissingChangesRS(serverId);
attributes.add(Attributes.create("missing-changes",
- String.valueOf(missingChanges)));
+ String.valueOf(md.getMissingChangesRS(serverId))));
/* get the Server State */
AttributeBuilder builder = new AttributeBuilder("server-state");
@@ -791,17 +777,10 @@
{
if (serverId != 0)
{
- StringBuilder builder = new StringBuilder("Replication server RS(");
- builder.append(serverId);
- builder.append(") for domain \"");
- builder.append(replicationServerDomain.getBaseDn());
- builder.append("\"");
- return builder.toString();
+ return "Replication server RS(" + serverId + ") for domain \""
+ + replicationServerDomain.getBaseDn() + "\"";
}
- else
- {
- return "Unknown server";
- }
+ return "Unknown server";
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
index 50362de..0708ad9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -44,8 +44,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
-import org.opends.server.replication.server.changelog.je.ReplicationDB
- .ReplServerDBCursor;
+import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
@@ -265,22 +264,20 @@
* managed by this dbHandler and starting at the position defined
* by a given changeNumber.
*
- * @param changeNumber The position where the iterator must start.
- *
+ * @param startAfterCN The position where the iterator must start.
* @return a new ReplicationIterator that allows to browse the db
* managed by this dbHandler and starting at the position defined
* by a given changeNumber.
- *
* @throws ChangelogException if a database problem happened.
*/
- public ReplicationIterator generateIterator(ChangeNumber changeNumber)
+ public ReplicationIterator generateIterator(ChangeNumber startAfterCN)
throws ChangelogException
{
- if (changeNumber == null)
+ if (startAfterCN == null)
{
flush();
}
- return new JEReplicationIterator(db, changeNumber, this);
+ return new JEReplicationIterator(db, startAfterCN, this);
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java
index e09fef5..e8dae9f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java
@@ -32,8 +32,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationIterator;
-import org.opends.server.replication.server.changelog.je.ReplicationDB
- .ReplServerDBCursor;
+import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
/**
* Berkeley DB JE implementation of IReplicationIterator.
@@ -52,20 +51,20 @@
* releaseCursor() method.
*
* @param db The db where the iterator must be created.
- * @param changeNumber The ChangeNumber after which the iterator must start.
+ * @param startAfterCN The ChangeNumber after which the iterator must start.
* @param dbHandler The associated DbHandler.
* @throws ChangelogException if a database problem happened.
*/
- public JEReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
+ public JEReplicationIterator(ReplicationDB db, ChangeNumber startAfterCN,
DbHandler dbHandler) throws ChangelogException
{
this.db = db;
this.dbHandler = dbHandler;
- this.lastNonNullCurrentCN = changeNumber;
+ this.lastNonNullCurrentCN = startAfterCN;
try
{
- cursor = db.openReadCursor(changeNumber);
+ cursor = db.openReadCursor(startAfterCN);
}
catch(Exception e)
{
@@ -79,7 +78,7 @@
dbHandler.flush();
// look again in the db
- cursor = db.openReadCursor(changeNumber);
+ cursor = db.openReadCursor(startAfterCN);
if (cursor == null)
{
throw new ChangelogException(Message.raw("no new change"));
--
Gitblit v1.10.0