From 0a9135e3444bbefde6188f456b9c9772a816096d Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 18 Sep 2013 15:17:14 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 125 +++++++++++++++++++++--------------------
1 files changed, 65 insertions(+), 60 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index eef08e4..e0ed2c1 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -75,7 +75,7 @@
*/
public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
{
- private final String baseDn;
+ private final DN baseDN;
/**
* The Status analyzer that periodically verifies whether the connected DSs
@@ -172,21 +172,21 @@
private ServerState ctHeartbeatState;
/**
- * Creates a new ReplicationServerDomain associated to the DN baseDn.
+ * Creates a new ReplicationServerDomain associated to the baseDN.
*
- * @param baseDn
- * The baseDn associated to the ReplicationServerDomain.
+ * @param baseDN
+ * The baseDN associated to the ReplicationServerDomain.
* @param localReplicationServer
* the ReplicationServer that created this instance.
*/
- public ReplicationServerDomain(String baseDn,
+ public ReplicationServerDomain(DN baseDN,
ReplicationServer localReplicationServer)
{
- this.baseDn = baseDn;
+ this.baseDN = baseDN;
this.localReplicationServer = localReplicationServer;
this.assuredTimeoutTimer = new Timer("Replication server RS("
+ localReplicationServer.getServerId()
- + ") assured timer for domain \"" + baseDn + "\"", true);
+ + ") assured timer for domain \"" + baseDN + "\"", true);
this.changelogDB = localReplicationServer.getChangelogDB();
DirectoryServer.registerMonitorProvider(this);
@@ -253,7 +253,8 @@
// Unknown assured mode: should never happen
Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get(
Integer.toString(localReplicationServer.getServerId()),
- assuredMode.toString(), baseDn, update.toString());
+ assuredMode.toString(), baseDN.toNormalizedString(),
+ update.toString());
logError(errorMsg);
assuredMessage = false;
}
@@ -405,7 +406,7 @@
{
try
{
- if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg))
+ if (this.changelogDB.publishUpdateMsg(baseDN, serverId, updateMsg))
{
/*
* JNR: Matt and I had a hard time figuring out where to put this
@@ -608,7 +609,8 @@
// Should never happen
Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get(
Integer.toString(localReplicationServer.getServerId()),
- Byte.toString(safeDataLevel), baseDn, update.toString());
+ Byte.toString(safeDataLevel), baseDN.toNormalizedString(),
+ update.toString());
logError(errorMsg);
} else if (sourceGroupId == groupId
// Assured feature does not cross different group IDS
@@ -760,7 +762,7 @@
mb.append(ERR_RS_ERROR_SENDING_ACK.get(
Integer.toString(localReplicationServer.getServerId()),
Integer.toString(origServer.getServerId()),
- csn.toString(), baseDn));
+ csn.toString(), baseDN.toNormalizedString()));
mb.append(" ");
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
@@ -838,7 +840,7 @@
mb.append(ERR_RS_ERROR_SENDING_ACK.get(
Integer.toString(localReplicationServer.getServerId()),
Integer.toString(origServer.getServerId()),
- csn.toString(), baseDn));
+ csn.toString(), baseDN.toNormalizedString()));
mb.append(" ");
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
@@ -1275,7 +1277,7 @@
*/
public Set<Integer> getServerIds()
{
- return changelogDB.getDomainServerIds(baseDn);
+ return changelogDB.getDomainServerIds(baseDN);
}
/**
@@ -1292,7 +1294,7 @@
*/
public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
{
- return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN);
+ return changelogDB.getCursorFrom(baseDN, serverId, startAfterCSN);
}
/**
@@ -1305,7 +1307,7 @@
*/
public long getCount(int serverId, CSN from, CSN to)
{
- return changelogDB.getCount(baseDn, serverId, from, to);
+ return changelogDB.getCount(baseDN, serverId, from, to);
}
/**
@@ -1315,16 +1317,17 @@
*/
public long getChangesCount()
{
- return changelogDB.getDomainChangesCount(baseDn);
+ return changelogDB.getDomainChangesCount(baseDN);
}
/**
- * Get the baseDn.
- * @return Returns the baseDn.
+ * Get the baseDN.
+ *
+ * @return Returns the baseDN.
*/
- public String getBaseDn()
+ public DN getBaseDN()
{
- return baseDn;
+ return baseDN;
}
/**
@@ -1520,7 +1523,7 @@
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
- this.baseDn, Integer.toString(msg.getDestination())));
+ baseDN.toNormalizedString(), Integer.toString(msg.getDestination())));
mb.append(" In Replication Server=").append(
this.localReplicationServer.getMonitorInstanceName());
mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
@@ -1567,7 +1570,8 @@
*/
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
- this.baseDn, Integer.toString(msg.getDestination())));
+ baseDN.toNormalizedString(),
+ Integer.toString(msg.getDestination())));
mb.append(" unroutable message =" + msg.getClass().getSimpleName());
mb.append(" Details: " + ioe.getLocalizedMessage());
final Message message = mb.toMessage();
@@ -1698,7 +1702,7 @@
stopAllServers(true);
- changelogDB.shutdownDomain(baseDn);
+ changelogDB.shutdownDomain(baseDN);
}
/**
@@ -1709,7 +1713,7 @@
public ServerState getDbServerState()
{
ServerState serverState = new ServerState();
- for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values())
+ for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDN).values())
{
serverState.update(lastCSN);
}
@@ -1722,7 +1726,7 @@
@Override
public String toString()
{
- return "ReplicationServerDomain " + baseDn;
+ return "ReplicationServerDomain " + baseDN;
}
/**
@@ -1755,10 +1759,9 @@
{
if (i == 2)
{
- Message message =
- ERR_EXCEPTION_SENDING_TOPO_INFO
- .get(baseDn, "directory", Integer.toString(dsHandler
- .getServerId()), e.getMessage());
+ Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
+ baseDN.toNormalizedString(), "directory",
+ Integer.toString(dsHandler.getServerId()), e.getMessage());
logError(message);
}
}
@@ -1793,7 +1796,7 @@
if (i == 2)
{
Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
- baseDn, "replication",
+ baseDN.toNormalizedString(), "replication",
Integer.toString(rsHandler.getServerId()), e.getMessage());
logError(message);
}
@@ -1934,9 +1937,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " + this + " Receiving ResetGenerationIdMsg from "
- + senderHandler.getServerId() + " for baseDn " + baseDn + ":\n"
- + genIdMsg);
+ debug("Receiving ResetGenerationIdMsg from "
+ + senderHandler.getServerId() + ":\n" + genIdMsg);
}
try
@@ -1965,10 +1967,8 @@
// Order to take a gen id we already have, just ignore
if (debugEnabled())
{
- TRACER.debugInfo("In " + this
- + " Reset generation id requested for baseDn " + baseDn
- + " but generation id was already " + this.generationId + ":\n"
- + genIdMsg);
+ debug("Reset generation id requested but generationId was already "
+ + this.generationId + ":\n" + genIdMsg);
}
}
@@ -1987,8 +1987,8 @@
}
} catch (IOException e)
{
- logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn,
- e.getMessage()));
+ logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(
+ baseDN.toNormalizedString(), e.getMessage()));
}
}
@@ -2001,7 +2001,8 @@
dsHandler.changeStatusForResetGenId(newGenId);
} catch (IOException e)
{
- logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn,
+ logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(
+ baseDN.toNormalizedString(),
Integer.toString(dsHandler.getServerId()),
e.getMessage()));
}
@@ -2014,7 +2015,8 @@
// treatment.
sendTopoInfoToAll();
- logError(NOTE_RESET_GENERATION_ID.get(baseDn, newGenId));
+ logError(NOTE_RESET_GENERATION_ID.get(baseDN.toNormalizedString(),
+ newGenId));
}
catch(Exception e)
{
@@ -2069,7 +2071,8 @@
sendTopoInfoToAllExcept(senderHandler);
Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
- senderHandler.getServerId(), baseDn, newStatus.toString());
+ senderHandler.getServerId(), baseDN.toNormalizedString(),
+ newStatus.toString());
logError(message);
}
catch(Exception e)
@@ -2114,7 +2117,7 @@
// StatusAnalyzer.
if (debugEnabled())
{
- TRACER.debugInfo("Status analyzer for domain " + baseDn
+ TRACER.debugInfo("Status analyzer for domain " + baseDN
+ " has been interrupted when"
+ " trying to acquire domain lock for changing the status of DS "
+ dsHandler.getServerId());
@@ -2133,7 +2136,7 @@
catch (IOException e)
{
logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER
- .get(baseDn,
+ .get(baseDN.toNormalizedString(),
Integer.toString(dsHandler.getServerId()),
e.getMessage()));
}
@@ -2186,10 +2189,10 @@
public void clearDbs()
{
// Reset the localchange and state db for the current domain
- changelogDB.clearDomain(baseDn);
+ changelogDB.clearDomain(baseDN);
try
{
- localReplicationServer.clearGenerationId(baseDn);
+ localReplicationServer.clearGenerationId(baseDN);
}
catch (Exception e)
{
@@ -2285,7 +2288,7 @@
rsHandler.getServerId(),
rsHandler.session.getReadableRemoteAddress(),
rsHandler.getGenerationId(),
- baseDn, getLocalRSServerId(), generationId);
+ baseDN.toNormalizedString(), getLocalRSServerId(), generationId);
logError(message);
ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(),
@@ -2494,7 +2497,8 @@
{
return "Replication server RS(" + localReplicationServer.getServerId()
+ ") " + localReplicationServer.getServerURL() + ",cn="
- + baseDn.replace(',', '_').replace('=', '_') + ",cn=Replication";
+ + baseDN.toNormalizedString().replace(',', '_').replace('=', '_')
+ + ",cn=Replication";
}
/**
@@ -2509,9 +2513,10 @@
String.valueOf(localReplicationServer.getServerId())));
attributes.add(Attributes.create("replication-server-port",
String.valueOf(localReplicationServer.getReplicationPort())));
- attributes.add(Attributes.create("domain-name", baseDn));
+ attributes.add(Attributes.create("domain-name",
+ baseDN.toNormalizedString()));
attributes.add(Attributes.create("generation-id",
- baseDn + " " + generationId));
+ baseDN + " " + generationId));
// Missing changes
long missingChanges = getDomainMonitorData().getMissingChangesRS(
@@ -2595,7 +2600,7 @@
if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
{
// let's try to seek the first change <= eligibleCSN
- CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN);
+ CSN newCSN = changelogDB.getCSNAfter(baseDN, serverId, eligibleCSN);
result.update(newCSN);
} else {
// for this serverId, all changes in the ChangelogDb are holder
@@ -2612,8 +2617,7 @@
if (debugEnabled())
{
- TRACER
- .debugInfo("In " + this + " getEligibleState() result is " + result);
+ debug("getEligibleState() result is " + result);
}
return result;
}
@@ -2629,7 +2633,7 @@
public ServerState getStartState()
{
ServerState domainStartState = new ServerState();
- for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values())
+ for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDN).values())
{
domainStartState.update(firstCSN);
}
@@ -2650,7 +2654,7 @@
CSN eligibleCSN = null;
for (Entry<Integer, CSN> entry :
- changelogDB.getDomainLastCSNs(baseDn).entrySet())
+ changelogDB.getDomainLastCSNs(baseDN).entrySet())
{
// Consider this producer (DS/db).
final int serverId = entry.getKey();
@@ -2767,7 +2771,7 @@
logError(ERR_CHANGELOG_ERROR_SENDING_MSG
.get("Replication Server "
+ localReplicationServer.getReplicationPort() + " "
- + baseDn + " " + localReplicationServer.getServerId()));
+ + baseDN + " " + localReplicationServer.getServerId()));
stopServer(rsHandler, false);
}
}
@@ -2844,7 +2848,7 @@
*/
public long getLatestDomainTrimDate()
{
- return changelogDB.getDomainLatestTrimDate(baseDn);
+ return changelogDB.getDomainLatestTrimDate(baseDN);
}
/**
@@ -2962,8 +2966,9 @@
private void debug(String message)
{
- TRACER.debugInfo("In RS serverId=" + localReplicationServer.getServerId()
- + " for baseDn=" + baseDn + " and port="
- + localReplicationServer.getReplicationPort() + ": " + message);
+ TRACER.debugInfo("In ReplicationServerDomain serverId="
+ + localReplicationServer.getServerId() + " for baseDN=" + baseDN
+ + " and port=" + localReplicationServer.getReplicationPort()
+ + ": " + message);
}
}
--
Gitblit v1.10.0