From 7c30dbb5403772b323df3ad907d9ed15d23b5aee Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Thu, 29 Apr 2010 20:35:40 +0000
Subject: [PATCH] Last batch of changes for this week. This adds support for the IETF based Password Policy for LDAP as SubEntry. Also resolves the following issues : - 4544 : initializeBackend() should not set JE env config params directly. - 4478 : ECL in draft compat mode / search lastchangenumber can be very long - 4538 : Virtual attributes not retrieved when entry cache configured - 4547 : Search Filter Matching differ for cn=Directory Manager and plain user. - 4514 : Logs shows unexpected message with replication monitoring data missing (Partial fix) - 4534 : Replication using security does not work after server restart - 4516 : SEVERE_ERROR: servers (...) have the same ServerId In addition, they also improve reliability and performance in various areas including CollectiveAttributes, Virtual Attributes and Subentries management, Schema loading, Replication...
---
opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 393 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 383 insertions(+), 10 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 0e045bc..86e27fb 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2006-2009 Sun Microsystems, Inc.
+ * Copyright 2006-2010 Sun Microsystems, Inc.
*/
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -37,6 +37,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.zip.DataFormatException;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
@@ -67,6 +68,44 @@
// close the db (shutdown or clear).
private ReentrantReadWriteLock dbCloseLock;
+ // Change counter management
+ // The Db itself does not allow to count records between a start and an end
+ // change. And we cannot rely on the replication seqnum that is part of the
+ // changenumber, since there can be holes (when an operation is canceled).
+ // And traversing all the records from the start one to the end one works
+ // fine but can be very long (ECL:lastChangeNumber).
+ //
+ // So we are storing special records in the DB (called counter records),
+ // that contain the number of changes since the previous counter record.
+ // One special record is :
+ // - a special key : changetime , serverid=0 seqnum=0
+ // - a counter value : count of changes since previous counter record.
+ //
+ // A counter record has to follow the order of the db, so it needs to have
+ // a changenumber key that follow the order.
+ // A counter record must have its own chagenumber key since the Db does not
+ // support duplicate key (it is a compatibility breaker character of the DB).
+ //
+ // We define 2 conditions to store a counter record :
+ // 1/- at least 'counterWindowSize' changes have been stored in the Db
+ // since the previous counter record
+ // 2/- the change to be stored has a new timestamp - so that the counter
+ // record is the first record for this timestamp.
+ //
+
+
+ private int counterCurrValue = 1;
+ // Current value of the counter.
+
+ private long counterTsLimit = 0;
+ // When not null,
+ // the next change with a ts different from tsForNewCounterRecord will lead
+ // to store a new counterRecord.
+
+ private int counterWindowSize = 1000;
+ // The counter record will never be written to the db more often than each
+ // counterWindowSize changes.
+
/**
* Creates a new database or open existing database that will be used
* to store and retrieve changes from an LDAP server.
@@ -92,6 +131,64 @@
true).getGenerationId());
dbCloseLock = new ReentrantReadWriteLock(true);
+
+ //
+ Cursor cursor = null;
+ Transaction txn = null;
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ OperationStatus status;
+ int distBackToCounterRecord = 0;
+
+ // Initialize counter
+ this.counterCurrValue = 1;
+ cursor = db.openCursor(txn, null);
+ status = cursor.getLast(key, data, LockMode.DEFAULT);
+ while (status == OperationStatus.SUCCESS)
+ {
+ try
+ {
+ ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8"));
+ if (!ReplicationDB.isaCounter(cn))
+ {
+ status = cursor.getPrev(key, data, LockMode.DEFAULT);
+ distBackToCounterRecord++;
+ }
+ else
+ {
+ // counter record
+ counterCurrValue = decodeCounterValue(data.getData())+1;
+ counterTsLimit = cn.getTime();
+ break;
+ }
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ replicationServer.shutdown();
+ if (txn != null)
+ {
+ try
+ {
+ txn.abort();
+ } catch (DatabaseException e1)
+ {
+ // can't do much more. The ReplicationServer is shuting down.
+ }
+ }
+ replicationServer.shutdown();
+ }
+ catch (DataFormatException e)
+ {
+ // Should never happen
+ }
+ }
+ counterCurrValue += distBackToCounterRecord;
+ cursor.close();
+
}
/**
@@ -123,9 +220,31 @@
{
DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
DatabaseEntry data = new ReplicationData(change);
- db.put(txn, key, data);
- }
+ if ((counterCurrValue!=0) &&
+ (counterCurrValue%counterWindowSize == 0))
+ {
+ // enough changes to generate a counter record - wait for the next
+ // change fo time
+ counterTsLimit = change.getChangeNumber().getTime();
+ }
+ if ((counterTsLimit!=0)
+ && (change.getChangeNumber().getTime() != counterTsLimit))
+ {
+ // Write the counter record
+ DatabaseEntry counterKey = new ReplicationKey(
+ new ChangeNumber(
+ change.getChangeNumber().getTime(),
+ 0, 0));
+ DatabaseEntry counterValue =
+ encodeCounterValue(counterCurrValue-1);
+ db.put(txn, counterKey, counterValue);
+ counterTsLimit=0;
+ }
+ db.put(txn, key, data);
+ counterCurrValue++;
+
+ }
txn.commitWriteNoSync();
txn = null;
done = true;
@@ -275,6 +394,7 @@
{
Cursor cursor = null;
String str = null;
+ ChangeNumber cn = null;
try
{
@@ -301,11 +421,25 @@
try
{
str = new String(key.getData(), "UTF-8");
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
+ {
+ // First record is a counter record .. go next
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
+ if (status != OperationStatus.SUCCESS)
+ {
+ // DB contains only a counter record
+ return null;
+ }
+ else
+ {
+ cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+ }
+ }
} catch (UnsupportedEncodingException e)
{
// never happens
}
- return new ChangeNumber(str);
}
finally
{
@@ -320,8 +454,9 @@
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
replicationServer.shutdown();
- return null;
+ cn = null;
}
+ return cn;
}
/**
@@ -331,7 +466,7 @@
public ChangeNumber readLastChange()
{
Cursor cursor = null;
- String str = null;
+ ChangeNumber cn = null;
try
{
@@ -349,13 +484,23 @@
}
try
{
- str = new String(key.getData(), "UTF-8");
+ String str = new String(key.getData(), "UTF-8");
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
+ {
+ if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
+ OperationStatus.SUCCESS)
+ {
+ /* database only contain a counter record - don't know
+ * how much it can be possible but ... */
+ cn = null;
+ }
+ }
}
catch (UnsupportedEncodingException e)
{
// never happens
}
- return new ChangeNumber(str);
}
finally
{
@@ -369,8 +514,9 @@
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
replicationServer.shutdown();
- return null;
+ cn = null;
}
+ return cn;
}
/**
@@ -611,7 +757,14 @@
{
return null;
}
- try {
+ try
+ {
+ ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8"));
+ if(ReplicationDB.isaCounter(cn))
+ {
+ // counter record
+ continue;
+ }
currentChange = ReplicationData.generateChange(data.getData());
} catch (Exception e) {
/*
@@ -681,4 +834,224 @@
dbCloseLock.writeLock().unlock();
}
}
+ /**
+ * Count the number of changes between 2 changes numbers (inclusive).
+ * @param start The lower limit of the count.
+ * @param stop The higher limit of the count.
+ * @return The number of changes between provided start and stop changeNumber.
+ * Returns -1 when an error occurs.
+ */
+ public int count(ChangeNumber start, ChangeNumber stop)
+ {
+ int counterRecord1 = 0;
+ int counterRecord2 = 0;
+ int distToCounterRecord1 = 0;
+ int distBackToCounterRecord2 = 0;
+ int count=0;
+ Cursor cursor = null;
+ Transaction txn = null;
+ OperationStatus status;
+ try
+ {
+ ChangeNumber cn ;
+
+ if ((start==null)&&(stop==null))
+ return (int)db.count();
+
+ // Step 1 : from the start point, traverse db to the next counter record
+ // or to the stop point.
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ cursor = db.openCursor(txn, null);
+ if (start != null)
+ {
+ key = new ReplicationKey(start);
+ status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+ if (status == OperationStatus.NOTFOUND)
+ status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
+ }
+ else
+ {
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
+ }
+
+ while (status == OperationStatus.SUCCESS)
+ {
+ // test whether the record is a regular change or a counter
+ String csnString = new String(key.getData(), "UTF-8");
+ cn = new ChangeNumber(csnString);
+ if (cn.getServerId() != 0)
+ {
+ // reached a regular change record
+ // test whether we reached the 'stop' target
+ if (!cn.newer(stop))
+ {
+ // let's loop
+ distToCounterRecord1++;
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
+ }
+ else
+ {
+ // reached the end
+ break;
+ }
+ }
+ else
+ {
+ // counter record
+ counterRecord1 = decodeCounterValue(data.getData());
+ break;
+ }
+ }
+ cursor.close();
+
+ // cases
+ //
+ if (counterRecord1==0)
+ return distToCounterRecord1;
+
+ // Step 2 : from the stop point, traverse db to the next counter record
+ // or to the start point.
+ txn = null;
+ data = new DatabaseEntry();
+ key = new ReplicationKey(stop);
+ cursor = db.openCursor(txn, null);
+ status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+ if (status == OperationStatus.SUCCESS)
+ {
+ cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+ }
+ else
+ {
+ key = new DatabaseEntry();
+ data = new DatabaseEntry();
+ status = cursor.getLast(key, data, LockMode.DEFAULT);
+ if (status != OperationStatus.SUCCESS)
+ {
+ /* database is empty */
+ return 0;
+ }
+ }
+ while (status == OperationStatus.SUCCESS)
+ {
+ cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+ if (!ReplicationDB.isaCounter(cn))
+ {
+ // regular change record
+ if (!cn.older(start))
+ {
+ distBackToCounterRecord2++;
+ status = cursor.getPrev(key, data, LockMode.DEFAULT);
+ }
+ else
+ break;
+ }
+ else
+ {
+ // counter record
+ counterRecord2 = decodeCounterValue(data.getData());
+ break;
+ }
+ }
+ cursor.close();
+
+ // Step 3 : Now consolidates the result
+ if (counterRecord1!=0)
+ {
+ if (counterRecord1 == counterRecord2)
+ {
+ // only one cp between from and to - no need to use it
+ count = distToCounterRecord1 + distBackToCounterRecord2;
+ }
+ else
+ {
+ // 2 cp between from and to
+ count = distToCounterRecord1 + (counterRecord2-counterRecord1)
+ + distBackToCounterRecord2;
+ }
+ }
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ replicationServer.shutdown();
+ }
+ catch (DataFormatException e)
+ {
+ // Should never happen
+ }
+ finally
+ {
+ if (cursor != null)
+ cursor.close();
+ if (txn != null)
+ {
+ try
+ {
+ txn.abort();
+ } catch (DatabaseException e1)
+ {
+ // can't do much more. The ReplicationServer is shuting down.
+ }
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Test if a provided changeNumber represents a counter record.
+ * @param cn The provided changeNumber.
+ * @return True if the provided changenumber is a counter.
+ */
+ static private boolean isaCounter(ChangeNumber cn)
+ {
+ return ((cn.getServerId()== 0) && (cn.getSeqnum()==0));
+ }
+
+ /**
+ * Decode the provided database entry as a the value of a counter.
+ * @param entry The provided entry.
+ * @return The counter value.
+ * @throws DataFormatException
+ */
+ private static int decodeCounterValue(byte[] entry)
+ throws DataFormatException
+ {
+ try
+ {
+ String numAckStr = new String(entry, 0, entry.length, "UTF-8");
+ return Integer.parseInt(numAckStr);
+
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+ }
+
+ /**
+ * Encode the provided counter value in a database entry.
+ * @param entry The provided entry.
+ * @return The databse entry with the counter value encoded inside..
+ * @throws UnsupportedEncodingException
+ */
+ static private DatabaseEntry encodeCounterValue(int value)
+ throws UnsupportedEncodingException
+ {
+ DatabaseEntry entry = new DatabaseEntry();
+ entry.setData(String.valueOf(value).getBytes("UTF-8"));
+ return entry;
+ }
+
+ /**
+ * Set the counter writing window size (public method for unit tests only).
+ * @param size Size in number of record.
+ */
+ public void setCounterWindowSize(int size)
+ {
+ this.counterWindowSize = size;
+ }
+
}
--
Gitblit v1.10.0