opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -136,7 +136,7 @@ */ private static final DebugTracer TRACER = getTracer(); private static final String EXPORT_BASE_DN = "dc=replicationChanges"; private static final String BASE_DN = "dc=replicationchanges"; // The base DNs for this backend. private DN[] baseDNs; @@ -545,7 +545,7 @@ ReplicationServerDomain rc = rsdi.next(); // Skip containers that are not covered by the include branches. baseDN = DN.decode(rc.getBaseDn().toString() + "," + EXPORT_BASE_DN); baseDN = DN.decode(rc.getBaseDn().toString() + "," + BASE_DN); if (includeBranches == null || includeBranches.isEmpty()) { @@ -662,7 +662,7 @@ try { AddChangeRecordEntry changeRecord = new AddChangeRecordEntry(DN.decode(EXPORT_BASE_DN), new AddChangeRecordEntry(DN.decode(BASE_DN), attributes); ldifWriter.writeChangeRecord(changeRecord); } @@ -706,7 +706,7 @@ { AddChangeRecordEntry changeRecord = new AddChangeRecordEntry(DN.decode( exportContainer.getBaseDn() + "," + EXPORT_BASE_DN), exportContainer.getBaseDn() + "," + BASE_DN), attributes); ldifWriter.writeChangeRecord(changeRecord); } @@ -717,7 +717,7 @@ TRACER.debugCaught(DebugLogLevel.ERROR, e); } Message message = ERR_BACKEND_EXPORT_ENTRY.get( exportContainer.getBaseDn() + "," + EXPORT_BASE_DN, exportContainer.getBaseDn() + "," + BASE_DN, String.valueOf(e)); logError(message); } @@ -788,7 +788,7 @@ dn = DN.decode("puid=" + addMsg.getParentUid() + "," + "changeNumber=" + msg.getChangeNumber().toString() + "," + msg.getDn() +","+ "dc=replicationChanges"); msg.getDn() +","+ BASE_DN); Map<AttributeType,List<Attribute>> attributes = new HashMap<AttributeType,List<Attribute>>(); @@ -831,7 +831,7 @@ dn = DN.decode("uuid=" + msg.getUniqueId() + "," + "changeNumber=" + delMsg.getChangeNumber().toString()+ "," + msg.getDn() +","+ "dc=replicationChanges"); msg.getDn() +","+ BASE_DN); DeleteChangeRecordEntry changeRecord = new DeleteChangeRecordEntry(dn); @@ -854,7 +854,7 @@ dn = DN.decode("uuid=" + msg.getUniqueId() + "," + "changeNumber=" + msg.getChangeNumber().toString()+ "," + msg.getDn() +","+ "dc=replicationChanges"); msg.getDn() +","+ BASE_DN); op.setInternalOperation(true); ModifyChangeRecordEntry changeRecord = @@ -878,7 +878,7 @@ dn = DN.decode("uuid=" + msg.getUniqueId() + "," + "changeNumber=" + msg.getChangeNumber().toString()+ "," + msg.getDn() +","+ "dc=replicationChanges"); msg.getDn() +","+ BASE_DN); op.setInternalOperation(true); ModifyDNChangeRecordEntry changeRecord = @@ -911,8 +911,9 @@ SearchScope scope = searchOperation.getScope(); SearchFilter filter = searchOperation.getFilter(); if (entry.matchesBaseAndScope(searchBaseDN, scope) && filter.matchesEntry(entry)) boolean ms = entry.matchesBaseAndScope(searchBaseDN, scope); boolean mf = filter.matchesEntry(entry); if ( ms && mf ) { searchOperation.returnEntry(entry, new LinkedList<Control>()); } @@ -1188,10 +1189,17 @@ { if (baseDNSet.contains(searchBaseDN)) { searchOperation.returnEntry( new Entry(searchBaseDN, rootObjectclasses, attributes, operationalAttributes), new LinkedList<Control>()); // Get the base DN, scope, and filter for the search. SearchScope scope = searchOperation.getScope(); SearchFilter filter = searchOperation.getFilter(); Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes, operationalAttributes); if (re.matchesBaseAndScope(searchBaseDN, scope) && filter.matchesEntry(re)) { searchOperation.returnEntry(re, new LinkedList<Control>()); } return; } else @@ -1204,6 +1212,18 @@ } } // Get the base DN, scope, and filter for the search. SearchScope scope = searchOperation.getScope(); SearchFilter filter = searchOperation.getFilter(); Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes, operationalAttributes); if (re.matchesBaseAndScope(searchBaseDN, scope) && filter.matchesEntry(re)) { searchOperation.returnEntry(re, new LinkedList<Control>()); } // Walk through all entries and send the ones that match. Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator(); if (rsdi != null) @@ -1213,7 +1233,7 @@ ReplicationServerDomain rsd = rsdi.next(); // Skip containers that are not covered by the include branches. baseDN = DN.decode(rsd.getBaseDn().toString() + "," + EXPORT_BASE_DN); baseDN = DN.decode(rsd.getBaseDn().toString() + "," + BASE_DN); if (searchBaseDN.isDescendantOf(baseDN) || searchBaseDN.isAncestorOf(baseDN)) opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -37,6 +37,7 @@ import org.opends.server.types.DN; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.UpdateMessage; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.sleepycat.je.Cursor; import com.sleepycat.je.DatabaseEntry; @@ -63,6 +64,10 @@ // The maximum number of retries in case of DatabaseDeadlock Exception. private static final int DEADLOCK_RETRIES = 10; // The lock used to provide exclusive access to the thread that // close the db (shutdown or clear). private ReentrantReadWriteLock dbCloseLock; /** * Creates a new database or open existing database that will be used * to store and retrieve changes from an LDAP server. @@ -86,6 +91,8 @@ db = dbenv.getOrAddDb(serverId, baseDn, replicationServer.getReplicationServerDomain(baseDn, true).getGenerationId()); dbCloseLock = new ReentrantReadWriteLock(true); } /** @@ -108,6 +115,7 @@ // the operation is attempted again up to DEADLOCK_RETRIES times. while ((tries++ < DEADLOCK_RETRIES) && (!done)) { dbCloseLock.readLock().lock(); try { txn = dbenv.beginTransaction(); @@ -128,6 +136,10 @@ txn.abort(); txn = null; } finally { dbCloseLock.readLock().unlock(); } } if (!done) { @@ -190,8 +202,17 @@ { try { dbCloseLock.writeLock().lock(); try { db.close(); } catch (DatabaseException e) } finally { dbCloseLock.writeLock().unlock(); } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(NOTE_EXCEPTION_CLOSING_DATABASE.get(this.toString())); @@ -223,6 +244,7 @@ * @throws DatabaseException If a database error prevented the cursor * creation. * @throws Exception if the ReplServerDBCursor creation failed. * * @return The ReplServerDBCursor. */ public ReplServerDBCursor openDeleteCursor() @@ -242,9 +264,12 @@ try { dbCloseLock.readLock().lock(); cursor = db.openCursor(null, null); } catch (DatabaseException e1) } catch (DatabaseException e1) { dbCloseLock.readLock().unlock(); return null; } try @@ -253,6 +278,7 @@ DatabaseEntry data = new DatabaseEntry(); OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); cursor.close(); dbCloseLock.readLock().unlock(); if (status != OperationStatus.SUCCESS) { /* database is empty */ @@ -268,11 +294,14 @@ return new ChangeNumber(str); } catch (DatabaseException e) { try { try { cursor.close(); dbCloseLock.readLock().unlock(); } catch (DatabaseException dbe) { // The db is dead - let's only log. } /* database is faulty */ MessageBuilder mb = new MessageBuilder(); @@ -295,11 +324,13 @@ try { dbCloseLock.readLock().lock(); cursor = db.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT); cursor.close(); dbCloseLock.readLock().unlock(); if (status != OperationStatus.SUCCESS) { /* database is empty */ @@ -355,10 +386,14 @@ private ReplServerDBCursor(ChangeNumber startingChangeNumber) throws Exception { cursor = db.openCursor(txn, null); try { // Take the lock. From now on, whatever error that happen in the life // of this cursor should end by unlocking that lock. We must also // unlock it when throwing an exception. dbCloseLock.readLock().lock(); cursor = db.openCursor(txn, null); if (startingChangeNumber != null) { key = new ReplicationKey(startingChangeNumber); @@ -367,27 +402,44 @@ if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // We could not move the cursor to the expected startingChangeNumber if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // We could not even move the cursor closed to it => failure // Unlocking is required before throwing any exception dbCloseLock.readLock().unlock(); throw new Exception("ChangeNumber not available"); } else { // We can move close to the startingChangeNumber. // Let's create a cursor from that point. DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { try { cursor.close(); cursor = db.openCursor(txn, null); } catch(Exception e) { // Unlocking is required before throwing any exception dbCloseLock.readLock().unlock(); throw(e); } } } } } } catch (Exception e) { // Unlocking is required before throwing any exception dbCloseLock.readLock().unlock(); cursor.close(); throw (e); } @@ -395,9 +447,19 @@ private ReplServerDBCursor() throws DatabaseException { try { // We'll go on only if no close or no clear is running dbCloseLock.readLock().lock(); txn = dbenv.beginTransaction(); cursor = db.openCursor(txn, null); } catch(DatabaseException e) { dbCloseLock.readLock().unlock(); throw (e); } } /** * Close the ReplicationServer Cursor. @@ -414,12 +476,15 @@ } catch (DatabaseException e) { dbCloseLock.readLock().unlock(); MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } if (txn != null) { try @@ -434,6 +499,7 @@ replicationServer.shutdown(); } } dbCloseLock.readLock().unlock(); } /** @@ -460,6 +526,8 @@ } catch (DatabaseException e) { dbCloseLock.readLock().unlock(); MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); @@ -480,6 +548,7 @@ replicationServer.shutdown(); } } dbCloseLock.readLock().unlock(); } /** @@ -556,7 +625,7 @@ { cursor.delete(); } } } // ReplServerDBCursor /** * Clears this change DB from the changes it contains. @@ -566,10 +635,38 @@ */ public void clear() throws Exception, DatabaseException { // Clears the changes dbenv.clearDb(this.toString()); // The coming users will be blocked until the clear is done dbCloseLock.writeLock().lock(); try { String dbName = db.getDatabaseName(); // Clears the reference to this serverID dbenv.clearServerId(baseDn, serverId); // Closing is requested by the Berkeley DB before truncate db.close(); // Clears the changes dbenv.clearDb(dbName); db = null; // RE-create the db db = dbenv.getOrAddDb(serverId, baseDn, (long)-1); } catch(Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLEARING_DB.get(this.toString(), e.getMessage() + " " + stackTraceToSingleLineString(e))); logError(mb.toMessage()); } finally { // Relax the waiting users dbCloseLock.writeLock().unlock(); } } } opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -526,22 +526,31 @@ */ public final void clearDb(String databaseName) { Transaction txn = null; try { if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + "clearDb" + databaseName); Transaction txn = dbEnvironment.beginTransaction(null, null); txn = dbEnvironment.beginTransaction(null, null); dbEnvironment.truncateDatabase(txn, databaseName, false); txn.commitWriteNoSync(); txn = null; } catch (DatabaseException dbe) catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLEARING_DB.get(databaseName, dbe.getLocalizedMessage())); e.getMessage() + " " + stackTraceToSingleLineString(e))); logError(mb.toMessage()); } finally { try { if (txn != null) txn.abort(); } catch(Exception e) {} } } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1196,9 +1196,11 @@ catch (Exception e) { // TODO: i18n logError(Message.raw( "Exception caught while clearing dbHandler:" + e.getLocalizedMessage())); MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), e.getMessage() + " " + stackTraceToSingleLineString(e))); logError(mb.toMessage()); } } sourceDbHandlers.clear(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -54,6 +54,7 @@ import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.plugin.ReplicationBroker; import org.opends.server.replication.plugin.ReplicationDomain; @@ -65,7 +66,9 @@ import org.opends.server.replication.protocol.ReplicationMessage; import org.opends.server.replication.protocol.SocketSession; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationBackend; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.schema.DirectoryStringSyntax; import org.opends.server.tasks.LdifFileWriter; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; @@ -73,6 +76,9 @@ import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.SearchScope; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -688,6 +694,38 @@ return addMsg; } /* * Check that the expected number of changes are in the replication * server database. */ private void checkChangelogSize(int expectedCount) { try { SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)"); InternalSearchOperation searchOperation = connection.processSearch(DN.decode("dc=replicationchanges"), SearchScope.SUBORDINATE_SUBTREE, filter); if (debugEnabled()) { if (searchOperation.getSearchEntries().size() != expectedCount) { for (SearchResultEntry sre : searchOperation.getSearchEntries()) { debugInfo("Entry found: " + sre.toLDIFString()); } } } assertEquals(searchOperation.getSearchEntries().size(), expectedCount); } catch(Exception e) { } } /** * SingleRS tests basic features of generationID * with one single Replication Server. @@ -712,6 +750,13 @@ replServer1 = createReplicationServer(changelog1ID, false, testCase); // To search the replication server db later in these tests, we need // to attach the search backend to the replication server just created. Thread.sleep(500); ReplicationBackend b = (ReplicationBackend)DirectoryServer.getBackend("replicationChanges"); b.setServer(replServer1); /* * Test : empty replicated backend * Check : nothing is broken - no generationId generated @@ -824,6 +869,11 @@ String ent1[] = { createEntry(UUID.randomUUID()) }; this.addTestEntriesToDB(ent1); // Verify that the replication server does contain the change related // to this ADD. Thread.sleep(500); checkChangelogSize(1); try { ReplicationMessage msg = broker3.receive(); @@ -846,8 +896,14 @@ debugInfo("Create again replServer1"); replServer1 = createReplicationServer(changelog1ID, false, testCase); // To search the replication server db later in these tests, we need // to attach the search backend to the replication server just created. Thread.sleep(500); b = (ReplicationBackend)DirectoryServer.getBackend("replicationChanges"); b.setServer(replServer1); debugInfo("Delay to allow DS to reconnect to replServer1"); Thread.sleep(1000); long genIdAfterRestart = replServer1.getGenerationId(baseDn); debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart); @@ -874,15 +930,10 @@ fail("Broker connection is expected to be accepted."); } /* * * FIXME Should clearJEBackend() regenerate generationId and do a start * against ReplicationServer ? */ /* * Test: Reset the replication server in order to allow new data set. */ // Also verify that no change occured on the replication server db // and still contain the ADD submitted initially. Thread.sleep(500); checkChangelogSize(1); debugInfo("Launch an on-line import on DS."); genId=-1; @@ -905,7 +956,9 @@ waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null); Thread.sleep(200); // TODO: Test that replication server db has been cleared // FIXME: Known bug : the replication server db should not have been // cleared in that case since it has the proper generation ID. checkChangelogSize(0); debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import."); genId = readGenId(); @@ -926,7 +979,6 @@ isDegradedDueToGenerationId(server3ID), "Expecting that broker3 is degraded since domain genId has been reset"); // Now create a change that normally would be replicated // but will not be replicated here since DS and brokers are degraded String[] ent3 = { createEntry(UUID.randomUUID()) };