From 905da506252c98b57cc0cfc82ee5a453c0e15e9b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 14 Aug 2013 09:30:53 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 155 ++++++++++++++++++---------------------------------
1 files changed, 56 insertions(+), 99 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index a040f00..2dbda07 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -27,35 +27,21 @@
*/
package org.opends.server.replication.server;
+import java.io.IOException;
+import java.util.*;
+import java.util.zip.DataFormatException;
+
+import org.opends.messages.Message;
+import org.opends.server.replication.common.*;
+import org.opends.server.replication.protocol.*;
+import org.opends.server.types.*;
+
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.StatusMachine.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.DataFormatException;
-
-import org.opends.messages.Message;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachine;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.*;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeBuilder;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.ResultCode;
-
/**
* This class defines a server handler, which handles all interaction with a
* peer server (RS or DS).
@@ -104,11 +90,11 @@
* @param newGenId The new generation id to take into account
* @throws IOException If IO error occurred.
*/
- public void changeStatusForResetGenId(long newGenId)
- throws IOException
+ public void changeStatusForResetGenId(long newGenId) throws IOException
{
- StatusMachineEvent event;
+ final int localRsServerId = replicationServer.getServerId();
+ StatusMachineEvent event;
if (newGenId == -1)
{
// The generation id is being made invalid, let's put the DS
@@ -127,9 +113,8 @@
if (debugEnabled())
{
TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- ". Closing connection to DS " + getServerId() +
+ "In RS " + localRsServerId +
+ ", closing connection to DS " + getServerId() +
" for baseDn " + getBaseDN() +
" to force reconnection as new local" +
" generationId and remote one match and DS is in bad gen id: " +
@@ -140,20 +125,19 @@
// would rewait the RSD lock that we already must have entering this
// method. This would lead to a reentrant lock which we do not want.
// So simply close the session, this will make the hang up appear
- // after the reader thread that took the RSD lock realeases it.
- if (session != null)
+ // after the reader thread that took the RSD lock releases it.
+ if (session != null
+ // V4 protocol introduced a StopMsg to properly close the
+ // connection between servers
+ && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- // V4 protocol introduces a StopMsg to properly close the
- // connection between servers
- if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ try
{
- try
- {
- session.publish(new StopMsg());
- } catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
+ session.publish(new StopMsg());
+ }
+ catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
}
}
@@ -165,12 +149,10 @@
{
if (debugEnabled())
{
- TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- ". DS " + getServerId() + " for baseDn " + getBaseDN() +
- " has already generation id " + newGenId +
- " so no ChangeStatusMsg sent to him.");
+ TRACER.debugInfo("In RS " + localRsServerId + ". DS "
+ + getServerId() + " for baseDn " + getBaseDN()
+ + " has already generation id " + newGenId
+ + " so no ChangeStatusMsg sent to him.");
}
return;
}
@@ -182,14 +164,13 @@
}
}
- if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) &&
- (status == ServerStatus.FULL_UPDATE_STATUS))
+ if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT
+ && status == ServerStatus.FULL_UPDATE_STATUS)
{
// Prevent useless error message (full update status cannot lead to bad
// gen status)
Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
- Integer.toString(replicationServerDomain.
- getReplicationServer().getServerId()),
+ Integer.toString(localRsServerId),
getBaseDN(),
Integer.toString(serverId),
Long.toString(generationId),
@@ -214,11 +195,9 @@
if (debugEnabled())
{
- TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- " Sending change status for reset gen id to " + getServerId() +
- " for baseDn " + getBaseDN() + ":\n" + csMsg);
+ TRACER.debugInfo("In RS " + localRsServerId
+ + " Sending change status for reset gen id to " + getServerId()
+ + " for baseDn " + getBaseDN() + ":\n" + csMsg);
}
session.publish(csMsg);
@@ -257,11 +236,9 @@
if (debugEnabled())
{
- TRACER.debugInfo(
- "In RS " +
- replicationServerDomain.getReplicationServer().getServerId() +
- " Sending change status from status analyzer to " + getServerId() +
- " for baseDn " + getBaseDN() + ":\n" + csMsg);
+ TRACER.debugInfo("In RS " + replicationServer.getServerId()
+ + " Sending change status from status analyzer to " + getServerId()
+ + " for baseDn " + getBaseDN() + ":\n" + csMsg);
}
session.publish(csMsg);
@@ -296,14 +273,13 @@
// Add the specific DS ones
attributes.add(Attributes.create("replica", serverURL));
attributes.add(Attributes.create("connected-to",
- this.replicationServerDomain.getReplicationServer()
- .getMonitorInstanceName()));
+ this.replicationServer.getMonitorInstanceName()));
MonitorData md = replicationServerDomain.getDomainMonitorData();
// Oldest missing update
- Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
- if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
+ long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
+ if (approxFirstMissingDate > 0)
{
Date date = new Date(approxFirstMissingDate);
attributes.add(Attributes.create(
@@ -314,14 +290,12 @@
}
// Missing changes
- long missingChanges = md.getMissingChanges(serverId);
- attributes.add(Attributes.create("missing-changes", String
- .valueOf(missingChanges)));
+ attributes.add(Attributes.create("missing-changes",
+ String.valueOf(md.getMissingChanges(serverId))));
// Replication delay
- long delay = md.getApproxDelay(serverId);
- attributes.add(Attributes.create("approximate-delay", String
- .valueOf(delay)));
+ attributes.add(Attributes.create("approximate-delay",
+ String.valueOf(md.getApproxDelay(serverId))));
/* get the Server State */
AttributeBuilder builder = new AttributeBuilder("server-state");
@@ -541,18 +515,9 @@
{
Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
Integer.toString(inServerStartMsg.getServerId()),
- Integer.toString(replicationServerDomain.getReplicationServer().
- getServerId()));
+ Integer.toString(replicationServer.getServerId()));
throw new DirectoryException(ResultCode.OTHER, errMessage);
}
- catch (NotSupportedOldVersionPDUException e)
- {
- // We do not need to support DS V1 connection, we just accept RS V1
- // connection:
- // We just trash the message, log the event for debug purpose and close
- // the connection
- throw new DirectoryException(ResultCode.OTHER, null, null);
- }
catch (Exception e)
{
// We do not need to support DS V1 connection, we just accept RS V1
@@ -588,7 +553,7 @@
}
finally
{
- if ((replicationServerDomain != null) &&
+ if (replicationServerDomain != null &&
replicationServerDomain.hasLock())
replicationServerDomain.release();
}
@@ -610,21 +575,20 @@
{
// Peer DS uses protocol < V4 : send it a ReplServerStartMsg
startMsg = new ReplServerStartMsg(getReplicationServerId(),
- getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+ getReplicationServerURL(), getBaseDN(), maxRcvWindow,
replicationServerDomain.getDbServerState(),
localGenerationId, sslEncryption, getLocalGroupId(),
- replicationServerDomain.getReplicationServer()
- .getDegradedStatusThreshold());
+ replicationServer.getDegradedStatusThreshold());
}
else
{
// Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
- getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+ getReplicationServerURL(), getBaseDN(), maxRcvWindow,
replicationServerDomain.getDbServerState(),
localGenerationId, sslEncryption, getLocalGroupId(),
- replicationServerDomain.getReplicationServer()
- .getDegradedStatusThreshold(), replicationServer.getWeight(),
+ replicationServer.getDegradedStatusThreshold(),
+ replicationServer.getWeight(),
replicationServerDomain.getConnectedLDAPservers().size());
}
@@ -651,17 +615,10 @@
{
if (serverId != 0)
{
- StringBuilder builder = new StringBuilder("Replica DS(");
- builder.append(serverId);
- builder.append(") for domain \"");
- builder.append(replicationServerDomain.getBaseDn());
- builder.append("\"");
- return builder.toString();
+ return "Replica DS(" + serverId + ") for domain \""
+ + replicationServerDomain.getBaseDn() + "\"";
}
- else
- {
- return "Unknown server";
- }
+ return "Unknown server";
}
/**
@@ -740,7 +697,7 @@
else
{
// We are an empty ReplicationServer
- if ((generationId > 0) && (!getServerState().isEmpty()))
+ if (generationId > 0 && !getServerState().isEmpty())
{
// If the LDAP server has already sent changes
// it is not expected to connect to an empty RS
--
Gitblit v1.10.0