From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication
---
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java | 481 +++++++++++++++++++++++++++++++++++++++++++----------
1 files changed, 391 insertions(+), 90 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index 0a4f630..fc7742c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -25,10 +25,12 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
+import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -58,6 +60,12 @@
private Environment dbEnvironment = null;
private Database stateDb = null;
private ReplicationServer replicationServer = null;
+ private static final String GENERATION_ID_TAG = "GENID";
+ private static final String FIELD_SEPARATOR = " ";
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
/**
* Initialize this class.
@@ -117,16 +125,120 @@
Cursor cursor = stateDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
+
try
{
+ /*
+ * Get the domain base DN/ generationIDs records
+ */
OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
while (status == OperationStatus.SUCCESS)
{
try
{
String stringData = new String(data.getData(), "UTF-8");
- String[] str = stringData.split(" ", 2);
- short serverId = new Short(str[0]);
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " Read tag baseDn generationId=" + stringData);
+
+ String[] str = stringData.split(FIELD_SEPARATOR, 3);
+ if (str[0].equals(GENERATION_ID_TAG))
+ {
+ long generationId=-1;
+
+ DN baseDn;
+
+ try
+ {
+ // <generationId>
+ generationId = new Long(str[1]);
+ }
+ catch (NumberFormatException e)
+ {
+ // should never happen
+ // TODO: i18n
+ throw new ReplicationDBException(Message.raw(
+ "replicationServer state database has a wrong format: " +
+ e.getLocalizedMessage()
+ + "<" + str[1] + ">"));
+ }
+
+ // <baseDn>
+ baseDn = null;
+ try
+ {
+ baseDn = DN.decode(str[2]);
+ } catch (DirectoryException e)
+ {
+ Message message =
+ ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
+ logError(message);
+
+ }
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " Has read baseDn=" + baseDn
+ + " generationId=" + generationId);
+
+ replicationServer.getReplicationCache(baseDn, true).
+ setGenerationId(generationId, true);
+ }
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // should never happens
+ // TODO: i18n
+ throw new ReplicationDBException(Message.raw("need UTF-8 support"));
+ }
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
+ }
+
+ /*
+ * Get the server Id / domain base DN records
+ */
+ status = cursor.getFirst(key, data, LockMode.DEFAULT);
+ while (status == OperationStatus.SUCCESS)
+ {
+ String stringData = null;
+ try
+ {
+ stringData = new String(data.getData(), "UTF-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // should never happens
+ // TODO: i18n
+ throw new ReplicationDBException(Message.raw(
+ "need UTF-8 support"));
+ }
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " Read serverId BaseDN=" + stringData);
+
+ String[] str = stringData.split(FIELD_SEPARATOR, 2);
+ if (!str[0].equals(GENERATION_ID_TAG))
+ {
+ short serverId = -1;
+ try
+ {
+ // <serverId>
+ serverId = new Short(str[0]);
+ } catch (NumberFormatException e)
+ {
+ // should never happen
+ // TODO: i18n
+ throw new ReplicationDBException(Message.raw(
+ "replicationServer state database has a wrong format: " +
+ e.getLocalizedMessage()
+ + "<" + str[0] + ">"));
+ }
+ // <baseDn>
DN baseDn = null;
try
{
@@ -134,113 +246,302 @@
} catch (DirectoryException e)
{
Message message =
- ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
+ ERR_IGNORE_BAD_DN_IN_DATABASE_IDENTIFIER.get(str[1]);
logError(message);
}
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " Has read: baseDn=" + baseDn
+ + " serverId=" + serverId);
+
DbHandler dbHandler =
- new DbHandler(serverId, baseDn, replicationServer, this);
- replicationServer.getReplicationCache(baseDn).newDb(serverId,
- dbHandler);
- } catch (NumberFormatException e)
- {
- // should never happen
- // TODO: i18n
- throw new ReplicationDBException(Message.raw(
- "replicationServer state database has a wrong format"));
- } catch (UnsupportedEncodingException e)
- {
- // should never happens
- // TODO: i18n
- throw new ReplicationDBException(Message.raw(
- "need UTF-8 support"));
+ new DbHandler(serverId, baseDn, replicationServer, this, 1);
+
+ replicationServer.getReplicationCache(baseDn, true).
+ setDbHandler(serverId, dbHandler);
}
+
status = cursor.getNext(key, data, LockMode.DEFAULT);
}
cursor.close();
- } catch (DatabaseException dbe) {
+ }
+ catch (DatabaseException dbe)
+ {
cursor.close();
throw dbe;
}
}
- /**
- * Find or create the database used to store changes from the server
- * with the given serverId and the given baseDn.
- * @param serverId The server id that identifies the server.
- * @param baseDn The baseDn that identifies the server.
- * @return the Database.
- * @throws DatabaseException in case of underlying Exception.
- */
- public Database getOrAddDb(Short serverId, DN baseDn)
- throws DatabaseException
- {
- try
+ /**
+ * Finds or creates the database used to store changes from the server
+ * with the given serverId and the given baseDn.
+ *
+ * @param serverId The server id that identifies the server.
+ * @param baseDn The baseDn that identifies the domain.
+ * @param generationId The generationId associated to this domain.
+ * @return the Database.
+ * @throws DatabaseException in case of underlying Exception.
+ */
+ public Database getOrAddDb(Short serverId, DN baseDn, Long generationId)
+ throws DatabaseException
{
- String stringId = serverId.toString() + " " + baseDn.toNormalizedString();
- byte[] byteId;
-
- byteId = stringId.getBytes("UTF-8");
-
- // Open the database. Create it if it does not already exist.
- DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setAllowCreate(true);
- dbConfig.setTransactional(true);
- Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
-
- DatabaseEntry key = new DatabaseEntry();
- key.setData(byteId);
- DatabaseEntry data = new DatabaseEntry();
- OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
- if (status == OperationStatus.NOTFOUND)
+ if (debugEnabled())
+ TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " +
+ serverId + " " + baseDn + " " + generationId);
+ try
{
- Transaction txn = dbEnvironment.beginTransaction(null, null);
- try {
- data.setData(byteId);
- stateDb.put(txn, key, data);
- txn.commitWriteNoSync();
- } catch (DatabaseException dbe)
+ String stringId = serverId.toString() + FIELD_SEPARATOR
+ + baseDn.toNormalizedString();
+
+ // Opens the database for the changes received from this server
+ // on this domain. Create it if it does not already exist.
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setAllowCreate(true);
+ dbConfig.setTransactional(true);
+ Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
+
+ // Creates the record serverId/domain base Dn in the stateDb
+ // if it does not already exist.
+ byte[] byteId;
+ byteId = stringId.getBytes("UTF-8");
+ DatabaseEntry key = new DatabaseEntry();
+ key.setData(byteId);
+ DatabaseEntry data = new DatabaseEntry();
+ OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
+ if (status == OperationStatus.NOTFOUND)
{
- // Abort the txn and propagate the Exception to the caller
- txn.abort();
- throw dbe;
+ Transaction txn = dbEnvironment.beginTransaction(null, null);
+ try {
+ data.setData(byteId);
+ if (debugEnabled())
+ TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
+ " serverId/Domain=<"+stringId+">");
+ stateDb.put(txn, key, data);
+ txn.commitWriteNoSync();
+ } catch (DatabaseException dbe)
+ {
+ // Abort the txn and propagate the Exception to the caller
+ txn.abort();
+ throw dbe;
+ }
+ }
+
+ // Creates the record domain base Dn/ generationId in the stateDb
+ // if it does not already exist.
+ stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
+ baseDn.toNormalizedString();
+ String dataStringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
+ generationId.toString() + FIELD_SEPARATOR +
+ baseDn.toNormalizedString();
+ byteId = stringId.getBytes("UTF-8");
+ byte[] dataByteId;
+ dataByteId = dataStringId.getBytes("UTF-8");
+ key = new DatabaseEntry();
+ key.setData(byteId);
+ data = new DatabaseEntry();
+ status = stateDb.get(null, key, data, LockMode.DEFAULT);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ Transaction txn = dbEnvironment.beginTransaction(null, null);
+ try {
+ data.setData(dataByteId);
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "Created in the state Db record Tag/Domain/GenId key=" +
+ stringId + " value=" + dataStringId);
+ stateDb.put(txn, key, data);
+ txn.commitWriteNoSync();
+ } catch (DatabaseException dbe)
+ {
+ // Abort the txn and propagate the Exception to the caller
+ txn.abort();
+ throw dbe;
+ }
+ }
+ return db;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // can't happen
+ return null;
+ }
+ }
+
+ /**
+ * Creates a new transaction.
+ *
+ * @return the transaction.
+ * @throws DatabaseException in case of underlying database Exception.
+ */
+ public Transaction beginTransaction() throws DatabaseException
+ {
+ return dbEnvironment.beginTransaction(null, null);
+ }
+
+ /**
+ * Shutdown the Db environment.
+ */
+ public void shutdown()
+ {
+ try
+ {
+ stateDb.close();
+ dbEnvironment.close();
+ } catch (DatabaseException e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ }
+ }
+
+ /**
+ * Clears the provided generationId associated to the provided baseDn
+ * from the state Db.
+ *
+ * @param baseDn The baseDn for which the generationID must be cleared.
+ *
+ */
+ public void clearGenerationId(DN baseDn)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " clearGenerationId " + baseDn);
+ try
+ {
+ // Deletes the record domain base Dn/ generationId in the stateDb
+ String stringId = GENERATION_ID_TAG + FIELD_SEPARATOR +
+ baseDn.toNormalizedString();
+ byte[] byteId = stringId.getBytes("UTF-8");
+ DatabaseEntry key = new DatabaseEntry();
+ key.setData(byteId);
+ DatabaseEntry data = new DatabaseEntry();
+ OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
+ if ((status == OperationStatus.SUCCESS) ||
+ (status == OperationStatus.KEYEXIST))
+ {
+ Transaction txn = dbEnvironment.beginTransaction(null, null);
+ try
+ {
+ stateDb.delete(txn, key);
+ txn.commitWriteNoSync();
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " clearGenerationId (" +
+ baseDn +") succeeded.");
+ }
+ catch (DatabaseException dbe)
+ {
+ // Abort the txn and propagate the Exception to the caller
+ txn.abort();
+ throw dbe;
+ }
+ }
+ else
+ {
+ // TODO : should have a better error logging
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ " clearGenerationId ("+ baseDn + " failed" + status.toString());
}
}
- return db;
- } catch (UnsupportedEncodingException e)
- {
- // can't happen
- return null;
+ catch (UnsupportedEncodingException e)
+ {
+ // can't happen
+ }
+ catch (DatabaseException dbe)
+ {
+ // can't happen
+ }
}
- }
- /**
- * Creates a new transaction.
- *
- * @return the transaction.
- * @throws DatabaseException in case of underlying database Exception.
- */
- public Transaction beginTransaction() throws DatabaseException
- {
- return dbEnvironment.beginTransaction(null, null);
- }
+ /**
+ * Clears the provided serverId associated to the provided baseDn
+ * from the state Db.
+ *
+ * @param baseDn The baseDn for which the generationID must be cleared.
+ * @param serverId The serverId to remove from the Db.
+ *
+ */
+ public void clearServerId(DN baseDn, Short serverId)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ "clearServerId(baseDN=" + baseDn + ", serverId=" + serverId);
+ try
+ {
+ String stringId = serverId.toString() + FIELD_SEPARATOR
+ + baseDn.toNormalizedString();
- /**
- * Shutdown the Db environment.
- */
- public void shutdown()
- {
- try
- {
- stateDb.close();
- dbEnvironment.close();
- } catch (DatabaseException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
+ // Deletes the record serverId/domain base Dn in the stateDb
+ byte[] byteId;
+ byteId = stringId.getBytes("UTF-8");
+ DatabaseEntry key = new DatabaseEntry();
+ key.setData(byteId);
+ DatabaseEntry data = new DatabaseEntry();
+ OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
+ if (status != OperationStatus.NOTFOUND)
+ {
+ Transaction txn = dbEnvironment.beginTransaction(null, null);
+ try {
+ data.setData(byteId);
+ stateDb.delete(txn, key);
+ txn.commitWriteNoSync();
+ if (debugEnabled())
+ TRACER.debugInfo(
+ " In " + this.replicationServer.getMonitorInstanceName() +
+ " clearServerId() succeeded " + baseDn + " " +
+ serverId);
+ }
+ catch (DatabaseException dbe)
+ {
+ // Abort the txn and propagate the Exception to the caller
+ txn.abort();
+ throw dbe;
+ }
+ }
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // can't happen
+ }
+ catch (DatabaseException dbe)
+ {
+ // can't happen
+ }
}
- }
+ /**
+ * Clears the database.
+ *
+ * @param databaseName The name of the database to clear.
+ */
+ public final void clearDb(String databaseName)
+ {
+ try
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "In " + this.replicationServer.getMonitorInstanceName() +
+ "clearDb" + databaseName);
+
+ Transaction txn = dbEnvironment.beginTransaction(null, null);
+ dbEnvironment.truncateDatabase(txn, databaseName, false);
+ }
+ catch (DatabaseException dbe)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_ERROR_CLEARING_DB.get(databaseName,
+ dbe.getLocalizedMessage()));
+ logError(mb.toMessage());
+ }
+ }
}
--
Gitblit v1.10.0