From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 139 +++++++++++++++++++++++++++++++++++++++++-----
1 files changed, 124 insertions(+), 15 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 2ae06c4..414dd88 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -25,13 +25,13 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
-import org.opends.messages.Message;
-
+import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import org.opends.messages.MessageBuilder;
import static org.opends.server.util.StaticUtils.getFileForPath;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.File;
import java.io.IOException;
@@ -61,6 +61,8 @@
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.ResultCode;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.loggers.debug.DebugTracer;
import com.sleepycat.je.DatabaseException;
@@ -101,12 +103,17 @@
private int queueSize;
private String dbDirname = null;
private long trimAge; // the time (in sec) after which the changes must
+ // be deleted from the persistent storage.
private int replicationPort;
- // de deleted from the persistent storage.
private boolean stopListen = false;
private ReplSessionSecurity replSessionSecurity;
/**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
+ /**
* Creates a new Replication server using the provided configuration entry.
*
* @param configuration The configuration of this replication server.
@@ -191,7 +198,7 @@
// The socket has probably been closed as part of the
// shutdown or changing the port number process.
// just log debug information and loop.
- Message message = DEBUG_REPLICATION_PORT_IOEXCEPTION.get();
+ Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
logError(message);
}
}
@@ -272,6 +279,10 @@
String hostname = serverURL.substring(0, separator);
boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL);
+ if (debugEnabled())
+ TRACER.debugInfo("RS " + this.getMonitorInstanceName() +
+ " connects to " + serverURL);
+
try
{
InetSocketAddress ServerAddr = new InetSocketAddress(
@@ -329,22 +340,41 @@
listenSocket.bind(new InetSocketAddress(changelogPort));
/*
- * create working threads
+ * creates working threads
+ * We must first connect, then start to listen.
*/
- listenThread =
- new ReplicationServerListenThread("Replication Server Listener", this);
- listenThread.start();
+ if (debugEnabled())
+ TRACER.debugInfo("RS " +getMonitorInstanceName()+
+ " creates connect threads");
connectThread =
new ReplicationServerConnectThread("Replication Server Connect", this);
connectThread.start();
+ // FIXME : Is it better to have the time to receive the ReplServerInfo
+ // from all the other replication servers since this info is necessary
+ // to route an early received total update request.
+
+ if (debugEnabled())
+ TRACER.debugInfo("RS " +getMonitorInstanceName()+
+ " creates listen threads");
+
+ listenThread =
+ new ReplicationServerListenThread("Replication Server Listener", this);
+ listenThread.start();
+
+ if (debugEnabled())
+ TRACER.debugInfo("RS " +getMonitorInstanceName()+
+ " successfully initialized");
+
} catch (DatabaseException e)
{
- Message message = ERR_COULD_NOT_INITIALIZE_DB.get(dbDirname);
+ Message message = ERR_COULD_NOT_INITIALIZE_DB.get(
+ getFileForPath(dbDirname).getAbsolutePath());
logError(message);
} catch (ReplicationDBException e)
{
- Message message = ERR_COULD_NOT_READ_DB.get(dbDirname);
+ Message message = ERR_COULD_NOT_READ_DB.get(dbDirname,
+ e.getLocalizedMessage());
logError(message);
} catch (UnknownHostException e)
{
@@ -362,18 +392,22 @@
* Get the ReplicationCache associated to the base DN given in parameter.
*
* @param baseDn The base Dn for which the ReplicationCache must be returned.
+ * @param create Specifies whether to create the ReplicationCache if it does
+ * not already exist.
* @return The ReplicationCache associated to the base DN given in parameter.
*/
- public ReplicationCache getReplicationCache(DN baseDn)
+ public ReplicationCache getReplicationCache(DN baseDn, boolean create)
{
ReplicationCache replicationCache;
synchronized (baseDNs)
{
replicationCache = baseDNs.get(baseDn);
- if (replicationCache == null)
+ if ((replicationCache == null) && (create))
+ {
replicationCache = new ReplicationCache(baseDn, this);
- baseDNs.put(baseDn, replicationCache);
+ baseDNs.put(baseDn, replicationCache);
+ }
}
return replicationCache;
@@ -384,6 +418,9 @@
*/
public void shutdown()
{
+ if (shutdown)
+ return;
+
shutdown = true;
// shutdown the connect thread
@@ -404,6 +441,12 @@
// replication Server service is closing anyway.
}
+ // shutdown the listen thread
+ if (listenThread != null)
+ {
+ listenThread.interrupt();
+ }
+
// shutdown all the ChangelogCaches
for (ReplicationCache replicationCache : baseDNs.values())
{
@@ -424,13 +467,36 @@
*
* @param id The serverId for which the dbHandler must be created.
* @param baseDn The DN for which the dbHandler muste be created.
+ * @param generationId The generationId for this server and this domain.
* @return The new DB handler for this ReplicationServer and the serverId and
* DN given in parameter.
* @throws DatabaseException in case of underlying database problem.
*/
- DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException
+ public DbHandler newDbHandler(short id, DN baseDn, long generationId)
+ throws DatabaseException
{
- return new DbHandler(id, baseDn, this, dbEnv);
+ return new DbHandler(id, baseDn, this, dbEnv, generationId);
+ }
+
+ /**
+ * Clears the generationId for the domain related to the provided baseDn.
+ * @param baseDn The baseDn for which to delete the generationId.
+ * @throws DatabaseException When it occurs.
+ */
+ public void clearGenerationId(DN baseDn)
+ throws DatabaseException
+ {
+ try
+ {
+ dbEnv.clearGenerationId(baseDn);
+ }
+ catch(Exception e)
+ {
+ TRACER.debugInfo(
+ "In RS <" + getMonitorInstanceName() +
+ " Exception in clearGenerationId" +
+ stackTraceToSingleLineString(e) + e.getLocalizedMessage());
+ }
}
/**
@@ -618,7 +684,50 @@
Attribute bases = new Attribute(baseType, "base-dn", baseValues);
attributes.add(bases);
+ // Publish to monitor the generation ID by domain
+ AttributeType generationIdType=
+ DirectoryServer.getAttributeType("base-dn-generation-id", true);
+ LinkedHashSet<AttributeValue> generationIdValues =
+ new LinkedHashSet<AttributeValue>();
+ for (DN base : baseDNs.keySet())
+ {
+ long generationId=-1;
+ ReplicationCache cache = getReplicationCache(base, false);
+ if (cache != null)
+ generationId = cache.getGenerationId();
+ generationIdValues.add(new AttributeValue(generationIdType,
+ base.toString() + " " + generationId));
+ }
+ Attribute generationIds = new Attribute(generationIdType, "generation-id",
+ generationIdValues);
+ attributes.add(generationIds);
+
return attributes;
}
+ /**
+ * Get the value of generationId for the replication domain
+ * associated with the provided baseDN.
+ *
+ * @param baseDN The baseDN of the domain.
+ * @return The value of the generationID.
+ */
+ public long getGenerationId(DN baseDN)
+ {
+ ReplicationCache rc = this.getReplicationCache(baseDN, false);
+ if (rc!=null)
+ return rc.getGenerationId();
+ return -1;
+ }
+
+ /**
+ * Get the serverId for this replication server.
+ *
+ * @return The value of the serverId.
+ *
+ */
+ public short getServerId()
+ {
+ return serverId;
+ }
}
--
Gitblit v1.10.0