opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -626,9 +626,10 @@ return; } DBCursor<UpdateMsg> cursor = rsDomain.getCursorFrom(previousCSN); DBCursor<UpdateMsg> cursor = null; try { cursor = rsDomain.getCursorFrom(previousCSN); int lookthroughCount = 0; // Walk through the changes opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1281,9 +1281,12 @@ * @param startAfterCSN * Starting point for the cursor. If null, start from the oldest CSN * @return a non null {@link DBCursor} * @throws ChangelogException * If a database problem happened * @see ReplicationDomainDB#getCursorFrom(DN, CSN) */ public DBCursor<UpdateMsg> getCursorFrom(CSN startAfterCSN) throws ChangelogException { return domainDB.getCursorFrom(baseDN, startAfterCSN); } @@ -1302,9 +1305,12 @@ * Starting point for the replicaDB cursors. If null, start from the * oldest CSN * @return a non null {@link DBCursor} going from oldest to newest CSN * @throws ChangelogException * If a database problem happened * @see ReplicationDomainDB#getCursorFrom(DN, ServerState) */ public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState) throws ChangelogException { return domainDB.getCursorFrom(baseDN, startAfterServerState); } opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -177,9 +177,12 @@ * Starting point for each ReplicaDB cursor. If null, start from the * oldest CSN for each ReplicaDB cursor. * @return a non null {@link DBCursor} * @throws ChangelogException * If a database problem happened * @see #getCursorFrom(DN, ServerState) */ DBCursor<UpdateMsg> getCursorFrom(DN baseDN, CSN startAfterCSN); DBCursor<UpdateMsg> getCursorFrom(DN baseDN, CSN startAfterCSN) throws ChangelogException; /** * Generates a {@link DBCursor} across all the replicaDBs for the specified @@ -199,10 +202,12 @@ * replicaDB is null, then start from the oldest CSN for this * replicaDB * @return a non null {@link DBCursor} * @throws ChangelogException * If a database problem happened * @see #getCursorFrom(DN, CSN) */ DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterServerState); ServerState startAfterServerState) throws ChangelogException; /** * for the specified serverId and replication domain. opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -35,6 +35,7 @@ import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.server.config.ConfigException; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMsg; @@ -42,11 +43,13 @@ import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.types.DN; import org.opends.server.types.DebugLogLevel; import org.opends.server.util.Pair; import org.opends.server.util.StaticUtils; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; /** @@ -54,6 +57,8 @@ */ public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB { /** The tracer object for the debug logger. */ protected static final DebugTracer TRACER = getTracer(); /** * {@link DBCursor} implementation that iterates across all the ReplicaDBs of @@ -83,6 +88,7 @@ }); public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState) throws ChangelogException { this.baseDN = baseDN; for (int serverId : getDomainMap(baseDN).keySet()) @@ -94,22 +100,15 @@ } private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN) CSN startAfterCSN) throws ChangelogException { JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); if (replicaDB != null) { try { DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN); cursor.next(); return cursor; } catch (ChangelogException e) { // ignored } DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN); cursor.next(); return cursor; } return EMPTY_CURSOR; } @@ -262,7 +261,10 @@ } catch (Exception e) { MessageBuilder mb = new MessageBuilder(); if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); final MessageBuilder mb = new MessageBuilder(); mb.append(e.getLocalizedMessage()); mb.append(" "); mb.append(String.valueOf(dbDirectory)); @@ -410,6 +412,9 @@ } catch (ChangelogException e) { if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); logError(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage())); } @@ -537,6 +542,8 @@ { firstException = e; } else if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); } cnIndexDB = null; @@ -682,6 +689,8 @@ { firstException = e; } else if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); } } } @@ -697,6 +706,8 @@ { firstException = e; } else if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); } if (firstException != null) @@ -748,6 +759,8 @@ } catch (Exception e) { if (debugEnabled()) TRACER.debugCaught(DebugLogLevel.ERROR, e); logError(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage())); } } @@ -765,6 +778,7 @@ /** {@inheritDoc} */ @Override public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, CSN startAfterCSN) throws ChangelogException { // Builds a new serverState for all the serverIds in the replication domain // to ensure we get cursors starting after the provided CSN. @@ -774,7 +788,7 @@ /** {@inheritDoc} */ @Override public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterServerState) ServerState startAfterServerState) throws ChangelogException { return new CrossReplicaDBCursor(baseDN, startAfterServerState); } opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1248,7 +1248,7 @@ // FIXME ECL In the handshake phase two, should RS send back a topo msg ? if (debugEnabled()) { TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg); debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg); } // Alright set the timeout to the desired value @@ -1310,11 +1310,11 @@ /* * Read the TopologyMsg that should come back. */ TopologyMsg topologyMsg = (TopologyMsg) localSession.receive(); final TopologyMsg topologyMsg = (TopologyMsg) localSession.receive(); if (debugEnabled()) { TRACER.debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg); }