opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -42,6 +42,7 @@ import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationIterator; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.InitializationException; @@ -276,7 +277,7 @@ { flush(); } return new ReplicationIterator(db, changeNumber, this); return new JEReplicationIterator(db, changeNumber, this); } /** opends/src/server/org/opends/server/replication/server/JEReplicationIterator.java
File was renamed from opends/src/server/org/opends/server/replication/server/ReplicationIterator.java @@ -32,12 +32,12 @@ import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationIterator; /** * This class allows to iterate through the changes received from a given * LDAP Server Identifier. * Berkeley DB JE implementation of IReplicationIterator. */ public class ReplicationIterator public class JEReplicationIterator implements ReplicationIterator { private UpdateMsg currentChange = null; private ReplServerDBCursor cursor = null; @@ -55,7 +55,7 @@ * @param dbHandler The associated DbHandler. * @throws ChangelogException if a database problem happened. */ public ReplicationIterator(ReplicationDB db, ChangeNumber changeNumber, public JEReplicationIterator(ReplicationDB db, ChangeNumber changeNumber, DbHandler dbHandler) throws ChangelogException { this.db = db; @@ -86,20 +86,15 @@ } } /** * Get the UpdateMsg where the iterator is currently set. * @return The UpdateMsg where the iterator is currently set. */ /** {@inheritDoc} */ @Override public UpdateMsg getChange() { return currentChange; } /** * Go to the next change in the ReplicationDB or in the server Queue. * @return false if the iterator is already on the last change before * this call. */ /** {@inheritDoc} */ @Override public boolean next() { currentChange = cursor.next(); @@ -136,11 +131,8 @@ return currentChange != null; } /** * Release the resources and locks used by this Iterator. * This method must be called when the iterator is no longer used. * Failure to do it could cause DB deadlock. */ /** {@inheritDoc} */ @Override public void releaseCursor() { synchronized (this) opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,9 +27,6 @@ */ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.debug.DebugLogger.*; import java.util.ArrayList; import java.util.List; import java.util.SortedSet; @@ -45,8 +42,12 @@ import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.types.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.debug.DebugLogger.*; /** * This class implements a buffering/producer/consumer mechanism of * replication changes (UpdateMsg) used inside the replication server. @@ -281,17 +282,15 @@ * load this change on the delayList * */ ReplicationIteratorComparator comparator = new ReplicationIteratorComparator(); SortedSet<ReplicationIterator> iteratorSortedSet = new TreeSet<ReplicationIterator>(comparator); new TreeSet<ReplicationIterator>( new ReplicationIteratorComparator()); try { /* fill the lateQueue */ for (int serverId : replicationServerDomain.getServers()) { ChangeNumber lastCsn = serverState .getChangeNumber(serverId); ChangeNumber lastCsn = serverState.getChangeNumber(serverId); ReplicationIterator iterator = replicationServerDomain .getChangelogIterator(serverId, lastCsn); if (iterator != null) @@ -318,8 +317,7 @@ && (lateQueue.count() < 100) && (lateQueue.bytesCount() < 50000)) { ReplicationIterator iterator = iteratorSortedSet .first(); ReplicationIterator iterator = iteratorSortedSet.first(); iteratorSortedSet.remove(iterator); lateQueue.add(iterator.getChange()); if (iterator.next()) @@ -376,7 +374,7 @@ { msg1 = msgQueue.removeFirst(); } while (!msg.getChangeNumber().equals(msg1.getChangeNumber())); this.updateServerState(msg); updateServerState(msg); return msg1; } } @@ -388,7 +386,7 @@ { msg = lateQueue.removeFirst(); } this.updateServerState(msg); updateServerState(msg); return msg; } } @@ -412,7 +410,7 @@ } msg = msgQueue.removeFirst(); if (this.updateServerState(msg)) if (updateServerState(msg)) { /* * Only push the message if it has not yet been seen @@ -462,10 +460,9 @@ there. So let's take the last change not sent directly from the db. */ ReplicationIteratorComparator comparator = new ReplicationIteratorComparator(); SortedSet<ReplicationIterator> iteratorSortedSet = new TreeSet<ReplicationIterator>(comparator); new TreeSet<ReplicationIterator>( new ReplicationIteratorComparator()); try { // Build a list of candidates iterator (i.e. db i.e. server) opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -72,6 +72,7 @@ import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ReplicationIterator; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeBuilder; import org.opends.server.types.AttributeType; opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,6 +47,7 @@ import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationIterator; import org.opends.server.types.*; import org.opends.server.util.TimeThread; opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java
New file @@ -0,0 +1,60 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.opends.server.replication.server.changelog.api; import org.opends.server.replication.protocol.UpdateMsg; /** * This interface allows to iterate through the changes received from a given * LDAP Server Identifier. */ public interface ReplicationIterator { /** * Get the UpdateMsg where the iterator is currently set. * * @return The UpdateMsg where the iterator is currently set. */ UpdateMsg getChange(); /** * Go to the next change in the ReplicationDB or in the server Queue. * * @return false if the iterator is already on the last change before this * call. */ boolean next(); /** * Release the resources and locks used by this Iterator. This method must be * called when the iterator is no longer used. Failure to do it could cause DB * deadlock. */ void releaseCursor(); } opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java
File was renamed from opends/src/server/org/opends/server/replication/server/ReplicationIteratorComparator.java @@ -23,8 +23,9 @@ * * * Copyright 2006-2008 Sun Microsystems, Inc. * Portions copyright 2013 ForgeRock AS */ package org.opends.server.replication.server; package org.opends.server.replication.server.changelog.api; import java.util.Comparator; @@ -45,6 +46,7 @@ * @param o2 second ReplicationIterator. * @return result of the comparison. */ @Override public int compare(ReplicationIterator o1, ReplicationIterator o2) { ChangeNumber csn1 = o1.getChange().getChangeNumber(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -42,6 +42,7 @@ import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.server.changelog.api.ReplicationIterator; import org.testng.annotations.Test; /**