opends/src/server/org/opends/server/replication/server/ChangelogState.java
New file @@ -0,0 +1,104 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.opends.server.replication.server; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; /** * This is the changelog state stored in the changelogStateDB. For each * replication domain, it contains: * <ul> * <li>its generationId</li> * <li>the list of serverIds composing it</li> * </ul> * <p> * This class is used during replication initialization to decouple the code * that reads the changelogStateDB from the code that makes use of its data. */ public class ChangelogState { private final Map<String, Long> domainToGenerationId = new HashMap<String, Long>(); private final Map<String, List<Integer>> domainToServerIds = new HashMap<String, List<Integer>>(); /** * Sets the generationId for the supplied replication domain. * * @param baseDn * the targeted replication domain baseDN * @param generationId * the generation Id to set */ public void setDomainGenerationId(String baseDn, long generationId) { domainToGenerationId.put(baseDn, generationId); } /** * Adds the serverId to the serverIds list of the supplied replication domain. * * @param serverId * the serverId to add * @param baseDn * the targeted replication domain baseDN */ public void addServerIdToDomain(int serverId, String baseDn) { List<Integer> serverIds = domainToServerIds.get(baseDn); if (serverIds == null) { serverIds = new LinkedList<Integer>(); domainToServerIds.put(baseDn, serverIds); } serverIds.add(serverId); } /** * Returns the Map of domainBaseDN => generationId. * * @return a Map of domainBaseDN => generationId */ public Map<String, Long> getDomainToGenerationId() { return domainToGenerationId; } /** * Returns the Map of domainBaseDN => List<serverId>. * * @return a Map of domainBaseDN => List<serverId>. */ public Map<String, List<Integer>> getDomainToServerIds() { return domainToServerIds; } } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -27,7 +27,6 @@ */ package org.opends.server.replication.server; import java.io.File; import java.io.IOException; import java.io.StringReader; import java.net.*; @@ -54,10 +53,9 @@ import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.CNIndexRecord; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.je.DbHandler; import org.opends.server.replication.server.changelog.je.DraftCNDbHandler; import org.opends.server.replication.server.changelog.je.ReplicationDbEnv; import org.opends.server.replication.server.changelog.je.JEChangelogDB; import org.opends.server.types.*; import org.opends.server.util.LDIFReader; import org.opends.server.util.ServerConstants; @@ -101,10 +99,9 @@ new HashMap<String, ReplicationServerDomain>(); private volatile boolean shutdown = false; private ReplicationDbEnv dbEnv; private int rcvWindow; private int queueSize; private String dbDirname = null; private final ChangelogDB changelogDB = new JEChangelogDB(this); /** * The delay (in sec) after which the changes must be deleted from the @@ -225,30 +222,11 @@ replicationServerUrls = new ArrayList<String>(); queueSize = configuration.getQueueSize(); purgeDelay = configuration.getReplicationPurgeDelay(); dbDirname = configuration.getReplicationDBDirectory(); rcvWindow = configuration.getWindowSize(); if (dbDirname == null) { dbDirname = "changelogDb"; } // Check that this path exists or create it. File f = getFileForPath(dbDirname); try { if (!f.exists()) { f.mkdir(); } } catch (Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(e.getLocalizedMessage()); mb.append(" "); mb.append(String.valueOf(getFileForPath(dbDirname))); Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()); throw new ConfigException(msg, e); } this.changelogDB.setReplicationDBDirectory(configuration .getReplicationDBDirectory()); groupId = (byte)configuration.getGroupId(); weight = configuration.getWeight(); assuredTimeout = configuration.getAssuredTimeout(); @@ -504,10 +482,7 @@ try { // Initialize the replicationServer database. dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(), this); dbEnv.initializeFromChangelogStateDB(); this.changelogDB.initializeDB(); setServerURL(); listenSocket = new ServerSocket(); @@ -539,16 +514,9 @@ if (debugEnabled()) TRACER.debugInfo("RS " +getMonitorInstanceName()+ " successfully initialized"); } catch (ChangelogException e) { Message message = ERR_COULD_NOT_READ_DB.get( getFileForPath(dbDirname).getAbsolutePath(), e.getLocalizedMessage()); logError(message); } catch (UnknownHostException e) { Message message = ERR_UNKNOWN_HOSTNAME.get(); logError(message); logError(ERR_UNKNOWN_HOSTNAME.get()); } catch (IOException e) { Message message = @@ -827,37 +795,12 @@ shutdownECL(); if (dbEnv != null) { dbEnv.shutdown(); } this.changelogDB.shutdownDB(); // Remove this instance from the global instance list allInstances.remove(this); } /** * Creates a new DB handler for this ReplicationServer and the serverId and DN * given in parameter. * * @param serverId * The serverId for which the dbHandler must be created. * @param baseDn * The DN for which the dbHandler must be created. * @return The new DB handler for this ReplicationServer and the serverId and * DN given in parameter. * @throws ChangelogException * in case of underlying database problem. */ public DbHandler newDbHandler(int serverId, String baseDn) throws ChangelogException { return new DbHandler(serverId, baseDn, this, dbEnv, queueSize); } /** * Clears the generationId for the replicationServerDomain related to the * provided baseDn. @@ -867,18 +810,6 @@ */ public void clearGenerationId(String baseDn) { try { dbEnv.clearGenerationId(baseDn); } catch (Exception ignored) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.WARNING, ignored); } } synchronized (cnIndexDBLock) { if (cnIndexDB != null) @@ -962,11 +893,7 @@ if (newPurgeDelay != purgeDelay) { purgeDelay = newPurgeDelay; // propagate for (ReplicationServerDomain domain : getReplicationServerDomains()) { domain.setPurgeDelay(purgeDelay*1000); } this.changelogDB.setPurgeDelay(purgeDelay * 1000); } rcvWindow = configuration.getWindowSize(); @@ -1047,7 +974,7 @@ } final String newDir = configuration.getReplicationDBDirectory(); if (newDir != null && !dbDirname.equals(newDir)) if (newDir != null && !this.changelogDB.getDBDirName().equals(newDir)) { return new ConfigChangeResult(ResultCode.SUCCESS, true); } @@ -1597,7 +1524,7 @@ * @throws DirectoryException * when needed. */ public ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException ChangeNumberIndexDB getChangeNumberIndexDB() throws DirectoryException { synchronized (cnIndexDBLock) { @@ -1605,9 +1532,9 @@ { if (cnIndexDB == null) { cnIndexDB = new DraftCNDbHandler(this, this.dbEnv); cnIndexDB = this.changelogDB.newChangeNumberIndexDB(); final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord(); // initialization of the lastGeneratedChangeNumebr from the DB content // initialization of the lastGeneratedChangeNumber from the DB content // if DB is empty => last record does not exist => default to 0 lastGeneratedChangeNumber = (lastCNRecord != null) ? lastCNRecord.getChangeNumber() : 0; @@ -1617,7 +1544,8 @@ catch (Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); Message message = ERR_CHANGENUMBER_DATABASE.get(e.getMessage()); Message message = ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()); throw new DirectoryException(OPERATIONS_ERROR, message, e); } } @@ -1816,6 +1744,16 @@ } /** * Returns the changelogDB. * * @return the changelogDB. */ ChangelogDB getChangelogDB() { return this.changelogDB; } /** * Get the replication server DB directory. * This is useful for tests to be able to do some cleanup. Might even be * useful for the server some day. @@ -1824,7 +1762,7 @@ */ public String getDbDirName() { return dbDirname; return this.changelogDB.getDBDirName(); } /* @@ -1896,33 +1834,4 @@ + baseDNs.keySet(); } /** * Initializes the generationId for the specified replication domain. * * @param baseDn * the replication domain * @param generationId * the the generationId value for initialization */ public void initDomainGenerationID(String baseDn, long generationId) { getReplicationServerDomain(baseDn, true).initGenerationID(generationId); } /** * Adds the specified serverId to the specified replication domain. * * @param serverId * the server Id to add to the replication domain * @param baseDn * the replication domain where to add the serverId * @throws ChangelogException * If a database error happened. */ public void addServerIdToDomain(int serverId, String baseDn) throws ChangelogException { DbHandler dbHandler = newDbHandler(serverId, baseDn); getReplicationServerDomain(baseDn, true).setDbHandler(serverId, dbHandler); } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -30,6 +30,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -46,9 +47,9 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.replication.server.changelog.je.DbHandler; import org.opends.server.types.*; import static org.opends.messages.ReplicationMessages.*; @@ -117,17 +118,26 @@ private final Queue<MessageHandler> otherHandlers = new ConcurrentLinkedQueue<MessageHandler>(); /** * This map contains the List of updates received from each LDAP server. */ private final Map<Integer, DbHandler> sourceDbHandlers = new ConcurrentHashMap<Integer, DbHandler>(); private final ChangelogDB changelogDB; /** The ReplicationServer that created the current instance. */ private ReplicationServer localReplicationServer; /** GenerationId management. */ /** * The generationId of the current replication domain. The generationId is * computed by hashing the first 1000 entries in the DB. */ private volatile long generationId = -1; private boolean generationIdSavedStatus = false; /** * JNR, this is legacy code, hard to follow logic. I think what this field * tries to say is: "is the generationId in use anywhere?", i.e. is there a * replication topology in place? As soon as an answer to any of these * question comes true, then it is set to true. * <p> * It looks like the only use of this field is to prevent the * {@link #generationId} from being reset by * {@link #resetGenerationIdIfPossible()}. */ private volatile boolean generationIdSavedStatus = false; /** The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); @@ -177,6 +187,7 @@ this.assuredTimeoutTimer = new Timer("Replication server RS(" + localReplicationServer.getServerId() + ") assured timer for domain \"" + baseDn + "\"", true); this.changelogDB = localReplicationServer.getChangelogDB(); DirectoryServer.registerMonitorProvider(this); } @@ -252,7 +263,7 @@ } } if (!publishMessage(update, serverId)) if (!publishUpdateMsg(update, serverId)) { return; } @@ -390,43 +401,46 @@ } } private boolean publishMessage(UpdateMsg update, int serverId) private boolean publishUpdateMsg(UpdateMsg updateMsg, int serverId) { // look for the dbHandler that is responsible for the LDAP server which // generated the change. DbHandler dbHandler; synchronized (sourceDbHandlers) try { dbHandler = sourceDbHandlers.get(serverId); if (dbHandler == null) if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg)) { try { dbHandler = localReplicationServer.newDbHandler(serverId, baseDn); generationIdSavedStatus = true; } catch (ChangelogException e) /* * JNR: Matt and I had a hard time figuring out where to put this * synchronized block. We elected to put it here, but without a strong * conviction. */ synchronized (generationIDLock) { /* * Because of database problem we can't save any more changes * from at least one LDAP server. * This replicationServer therefore can't do it's job properly anymore * and needs to close all its connections and shutdown itself. * JNR: I think the generationIdSavedStatus is set to true because * method above created a ReplicaDB which assumes the generationId was * communicated to another server. Hence setting true on this field * prevent the generationId from being reset. */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(" "); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); localReplicationServer.shutdown(); return false; generationIdSavedStatus = true; } sourceDbHandlers.put(serverId, dbHandler); } return true; } // Publish the messages to the source handler dbHandler.add(update); return true; catch (ChangelogException e) { /* * Because of database problem we can't save any more changes from at * least one LDAP server. This replicationServer therefore can't do it's * job properly anymore and needs to close all its connections and * shutdown itself. */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(" "); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); localReplicationServer.shutdown(); return false; } } private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler, @@ -1261,7 +1275,7 @@ */ public Set<Integer> getServerIds() { return sourceDbHandlers.keySet(); return changelogDB.getDomainServerIds(baseDn); } /** @@ -1278,29 +1292,7 @@ */ public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN) { DbHandler dbHandler = sourceDbHandlers.get(serverId); if (dbHandler == null) { return null; } ReplicaDBCursor cursor; try { cursor = dbHandler.generateCursorFrom(startAfterCSN); } catch (Exception e) { return null; } if (!cursor.next()) { close(cursor); return null; } return cursor; return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN); } /** @@ -1313,12 +1305,7 @@ */ public long getCount(int serverId, CSN from, CSN to) { DbHandler dbHandler = sourceDbHandlers.get(serverId); if (dbHandler != null) { return dbHandler.getCount(from, to); } return 0; return changelogDB.getCount(baseDn, serverId, from, to); } /** @@ -1328,12 +1315,7 @@ */ public long getChangesCount() { long entryCount = 0; for (DbHandler dbHandler : sourceDbHandlers.values()) { entryCount += dbHandler.getChangesCount(); } return entryCount; return changelogDB.getDomainChangesCount(baseDn); } /** @@ -1346,24 +1328,6 @@ } /** * Sets the provided DbHandler associated to the provided serverId. * * @param serverId the serverId for the server to which is * associated the DbHandler. * @param dbHandler the dbHandler associated to the serverId. * * @throws ChangelogException If a database error happened. */ public void setDbHandler(int serverId, DbHandler dbHandler) throws ChangelogException { synchronized (sourceDbHandlers) { sourceDbHandlers.put(serverId, dbHandler); } } /** * Retrieves the destination handlers for a routable message. * * @param msg The message to route. @@ -1734,20 +1698,7 @@ stopAllServers(true); shutdownDbHandlers(); } /** Shutdown all the dbHandlers. */ private void shutdownDbHandlers() { synchronized (sourceDbHandlers) { for (DbHandler dbHandler : sourceDbHandlers.values()) { dbHandler.shutdown(); } sourceDbHandlers.clear(); } changelogDB.shutdownDomain(baseDn); } /** @@ -1758,9 +1709,9 @@ public ServerState getDbServerState() { ServerState serverState = new ServerState(); for (DbHandler db : sourceDbHandlers.values()) for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values()) { serverState.update(db.getLastChange()); serverState.update(lastCSN); } return serverState; } @@ -2235,24 +2186,7 @@ public void clearDbs() { // Reset the localchange and state db for the current domain synchronized (sourceDbHandlers) { for (DbHandler dbHandler : sourceDbHandlers.values()) { try { dbHandler.clear(); } catch (Exception e) { // TODO: i18n MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), e.getMessage() + " " + stackTraceToSingleLineString(e))); logError(mb.toMessage()); } } shutdownDbHandlers(); } changelogDB.clearDomain(baseDn); try { localReplicationServer.clearGenerationId(baseDn); @@ -2397,20 +2331,6 @@ } /** * Set the purge delay on all the db Handlers for this Domain * of Replication. * * @param delay The new purge delay to use. */ public void setPurgeDelay(long delay) { for (DbHandler dbHandler : sourceDbHandlers.values()) { dbHandler.setPurgeDelay(delay); } } /** * Get the map of connected DSs. * @return The map of connected DSs */ @@ -2667,7 +2587,6 @@ { for (int serverId : dbState) { DbHandler h = sourceDbHandlers.get(serverId); CSN mostRecentDbCSN = dbState.getCSN(serverId); try { // Is the most recent change in the Db newer than eligible CSN ? @@ -2676,19 +2595,8 @@ if (eligibleCSN.olderOrEqual(mostRecentDbCSN)) { // let's try to seek the first change <= eligibleCSN ReplicaDBCursor cursor = null; try { cursor = h.generateCursorFrom(eligibleCSN); if (cursor != null && cursor.getChange() != null) { CSN newCSN = cursor.getChange().getCSN(); result.update(newCSN); } } catch (ChangelogException e) { // there's no change older than eligibleCSN (case of s3/csn31) result.update(new CSN(0, 0, serverId)); } finally { close(cursor); } CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN); result.update(newCSN); } else { // for this serverId, all changes in the ChangelogDb are holder // than eligibleCSN, the most recent in the db is our guy. @@ -2721,9 +2629,9 @@ public ServerState getStartState() { ServerState domainStartState = new ServerState(); for (DbHandler dbHandler : sourceDbHandlers.values()) for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values()) { domainStartState.update(dbHandler.getFirstChange()); domainStartState.update(firstCSN); } return domainStartState; } @@ -2741,10 +2649,12 @@ { CSN eligibleCSN = null; for (DbHandler db : sourceDbHandlers.values()) for (Entry<Integer, CSN> entry : changelogDB.getDomainLastCSNs(baseDn).entrySet()) { // Consider this producer (DS/db). int serverId = db.getServerId(); final int serverId = entry.getKey(); final CSN changelogLastCSN = entry.getValue(); // Should it be considered for eligibility ? CSN heartbeatLastCSN = @@ -2774,7 +2684,6 @@ continue; } CSN changelogLastCSN = db.getLastChange(); if (changelogLastCSN != null && (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN))) { @@ -2935,15 +2844,7 @@ */ public long getLatestDomainTrimDate() { long latest = 0; for (DbHandler db : sourceDbHandlers.values()) { if (latest == 0 || latest < db.getLatestTrimDate()) { latest = db.getLatestTrimDate(); } } return latest; return changelogDB.getDomainLatestTrimDate(baseDn); } /** opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
New file @@ -0,0 +1,229 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.opends.server.replication.server.changelog.api; import java.util.Map; import java.util.Set; import org.opends.server.config.ConfigException; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.UpdateMsg; /** * The changelogDB stores the replication data on persistent storage. * <p> * This interface allows to: * <ul> * <li>set the storage directory and the purge interval</li> * <li>get access to the {@link ChangeNumberIndexDB}</li> * <li>query or control the replication domain database(s) (composed of one or * more ReplicaDBs)</li> * <li>query/update each ReplicaDB</li> * </ul> */ public interface ChangelogDB { // DB control methods /** * Set the directory to be used by the replication database. * * @param dbDirName * the directory for use by the replication database * @throws ConfigException * if a problem occurs opening the directory */ void setReplicationDBDirectory(String dbDirName) throws ConfigException; /** * Get the replication server database directory. This is used by tests to do * some cleanup. * * @return the database directory name */ String getDBDirName(); /** * Initializes the replication database. */ void initializeDB(); /** * Sets the purge delay for the replication database. This purge delay is a * best effort. * * @param delayInMillis * the purge delay in milliseconds */ void setPurgeDelay(long delayInMillis); /** * Shutdown the replication database. */ void shutdownDB(); /** * Returns a new {@link ChangeNumberIndexDB} object. * * @return a new {@link ChangeNumberIndexDB} object * @throws ChangelogException * If a database problem happened */ ChangeNumberIndexDB newChangeNumberIndexDB() throws ChangelogException; // Domain methods /** * Returns the serverIds for the servers that are or have been part of the * provided replication domain. * * @param baseDn * the replication domain baseDn * @return a set of integers holding the serverIds */ Set<Integer> getDomainServerIds(String baseDn); /** * Get the number of changes for the specified replication domain. * * @param baseDn * the replication domain baseDn * @return the number of changes. */ long getDomainChangesCount(String baseDn); /** * Returns the FIRST {@link CSN}s of each serverId for the specified * replication domain. * * @param baseDn * the replication domain baseDn * @return a {serverId => FIRST CSN} Map */ Map<Integer, CSN> getDomainFirstCSNs(String baseDn); /** * Returns the LAST {@link CSN}s of each serverId for the specified * replication domain. * * @param baseDn * the replication domain baseDn * @return a {serverId => LAST CSN} Map */ Map<Integer, CSN> getDomainLastCSNs(String baseDn); /** * Retrieves the latest trim date for the specified replication domain. * * @param baseDn * the replication domain baseDn * @return the domain latest trim date */ long getDomainLatestTrimDate(String baseDn); /** * Shutdown the specified replication domain. * * @param baseDn * the replication domain baseDn */ void shutdownDomain(String baseDn); /** * Clear DB and shutdown for the specified replication domain. * * @param baseDn * the replication domain baseDn */ void clearDomain(String baseDn); // serverId methods /** * Return the number of changes between 2 provided {@link CSN}s for the * specified serverId and replication domain. * * @param baseDn * the replication domain baseDn * @param serverId * the serverId on which to act * @param from * The lower (older) CSN * @param to * The upper (newer) CSN * @return The computed number of changes */ long getCount(String baseDn, int serverId, CSN from, CSN to); /** * Returns the {@link CSN} situated immediately after the specified * {@link CSN} for the specified serverId and replication domain. * * @param baseDn * the replication domain baseDn * @param serverId * the serverId for which we want the information * @param startAfterCSN * The position where the iterator must start * @return a new ReplicationIterator that allows to browse the db managed by * this dbHandler and starting at the position defined by a given CSN. */ CSN getCSNAfter(String baseDn, int serverId, CSN startAfterCSN); /** * Generates a non empty {@link ReplicaDBCursor} for the specified serverId * and replication domain. * * @param baseDn * the replication domain baseDn * @param serverId * the serverId on which to act * @param startAfterCSN * The position where the iterator must start * @return a {@link ReplicaDBCursor} if the ReplicaDB is not empty, null * otherwise */ ReplicaDBCursor getCursorFrom(String baseDn, int serverId, CSN startAfterCSN); /** * for the specified serverId and replication domain. * * @param baseDn * the replication domain baseDn * @param serverId * the serverId on which to act * @param updateMsg * the update message to publish to the replicaDB * @return true if a db had to be created to publish this message * @throws ChangelogException * If a database problem happened */ boolean publishUpdateMsg(String baseDn, int serverId, UpdateMsg updateMsg) throws ChangelogException; } opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogException.java
@@ -70,7 +70,7 @@ * @param cause * The underlying cause that triggered this exception. */ protected ChangelogException(Message message, Throwable cause) public ChangelogException(Message message, Throwable cause) { super(message, cause); } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
New file @@ -0,0 +1,452 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.io.File; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.server.config.ConfigException; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.types.DebugLogLevel; import org.opends.server.util.Pair; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; /** * JE implementation of the ChangelogDB. */ public class JEChangelogDB implements ChangelogDB { /** The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * This map contains the List of updates received from each LDAP server. */ private final Map<String, Map<Integer, DbHandler>> sourceDbHandlers = new ConcurrentHashMap<String, Map<Integer, DbHandler>>(); private ReplicationDbEnv dbEnv; private String dbDirName = null; private File dbDirectory; /** The local replication server. */ private final ReplicationServer replicationServer; /** * Builds an instance of this class. * * @param replicationServer * the local replication server. */ public JEChangelogDB(ReplicationServer replicationServer) { this.replicationServer = replicationServer; } private Map<Integer, DbHandler> getDomainMap(String baseDn) { final Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDn); if (domainMap != null) { return domainMap; } return Collections.emptyMap(); } private DbHandler getDbHandler(String baseDn, int serverId) { return getDomainMap(baseDn).get(serverId); } /** * Provision resources for the specified serverId in the specified replication * domain. * * @param baseDn * the replication domain where to add the serverId * @param serverId * the server Id to add to the replication domain * @throws ChangelogException * If a database error happened. */ private void commission(String baseDn, int serverId, ReplicationServer rs) throws ChangelogException { getOrCreateDbHandler(baseDn, serverId, rs); } private Pair<DbHandler, Boolean> getOrCreateDbHandler(String baseDn, int serverId, ReplicationServer rs) throws ChangelogException { synchronized (sourceDbHandlers) { Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDn); if (domainMap == null) { domainMap = new ConcurrentHashMap<Integer, DbHandler>(); sourceDbHandlers.put(baseDn, domainMap); } DbHandler dbHandler = domainMap.get(serverId); if (dbHandler == null) { dbHandler = new DbHandler(serverId, baseDn, rs, dbEnv, rs.getQueueSize()); domainMap.put(serverId, dbHandler); return Pair.of(dbHandler, true); } return Pair.of(dbHandler, false); } } /** {@inheritDoc} */ @Override public void initializeDB() { try { dbEnv = new ReplicationDbEnv(getFileForPath(dbDirName).getAbsolutePath(), replicationServer); initializeChangelogState(dbEnv.readChangelogState()); } catch (ChangelogException e) { Message message = ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e .getLocalizedMessage()); logError(message); } } private void initializeChangelogState(final ChangelogState changelogState) throws ChangelogException { for (Map.Entry<String, Long> entry : changelogState.getDomainToGenerationId().entrySet()) { replicationServer.getReplicationServerDomain(entry.getKey(), true) .initGenerationID(entry.getValue()); } for (Map.Entry<String, List<Integer>> entry : changelogState .getDomainToServerIds().entrySet()) { final String baseDn = entry.getKey(); for (int serverId : entry.getValue()) { commission(baseDn, serverId, replicationServer); } } } /** {@inheritDoc} */ @Override public void shutdownDB() { if (dbEnv != null) { dbEnv.shutdown(); } } /** {@inheritDoc} */ @Override public Set<Integer> getDomainServerIds(String baseDn) { return getDomainMap(baseDn).keySet(); } /** {@inheritDoc} */ @Override public long getCount(String baseDn, int serverId, CSN from, CSN to) { DbHandler dbHandler = getDbHandler(baseDn, serverId); if (dbHandler != null) { return dbHandler.getCount(from, to); } return 0; } /** {@inheritDoc} */ @Override public long getDomainChangesCount(String baseDn) { long entryCount = 0; for (DbHandler dbHandler : getDomainMap(baseDn).values()) { entryCount += dbHandler.getChangesCount(); } return entryCount; } /** {@inheritDoc} */ @Override public void shutdownDomain(String baseDn) { shutdownDbHandlers(getDomainMap(baseDn)); } private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap) { synchronized (domainMap) { for (DbHandler dbHandler : domainMap.values()) { dbHandler.shutdown(); } domainMap.clear(); } } /** {@inheritDoc} */ @Override public Map<Integer, CSN> getDomainFirstCSNs(String baseDn) { final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn); final Map<Integer, CSN> results = new HashMap<Integer, CSN>(domainMap.size()); for (DbHandler dbHandler : domainMap.values()) { results.put(dbHandler.getServerId(), dbHandler.getFirstChange()); } return results; } /** {@inheritDoc} */ @Override public Map<Integer, CSN> getDomainLastCSNs(String baseDn) { final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn); final Map<Integer, CSN> results = new HashMap<Integer, CSN>(domainMap.size()); for (DbHandler dbHandler : domainMap.values()) { results.put(dbHandler.getServerId(), dbHandler.getLastChange()); } return results; } /** {@inheritDoc} */ @Override public void clearDomain(String baseDn) { final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn); synchronized (domainMap) { for (DbHandler dbHandler : domainMap.values()) { try { dbHandler.clear(); } catch (Exception e) { // TODO: i18n MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), e .getMessage() + " " + stackTraceToSingleLineString(e))); logError(mb.toMessage()); } } shutdownDbHandlers(domainMap); } try { dbEnv.clearGenerationId(baseDn); } catch (Exception ignored) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.WARNING, ignored); } } } /** {@inheritDoc} */ @Override public void setPurgeDelay(long delay) { for (Map<Integer, DbHandler> domainMap : sourceDbHandlers.values()) { for (DbHandler dbHandler : domainMap.values()) { dbHandler.setPurgeDelay(delay); } } } /** {@inheritDoc} */ @Override public long getDomainLatestTrimDate(String baseDn) { long latest = 0; for (DbHandler dbHandler : getDomainMap(baseDn).values()) { if (latest == 0 || latest < dbHandler.getLatestTrimDate()) { latest = dbHandler.getLatestTrimDate(); } } return latest; } /** {@inheritDoc} */ @Override public CSN getCSNAfter(String baseDn, int serverId, CSN startAfterCSN) { final DbHandler dbHandler = getDbHandler(baseDn, serverId); ReplicaDBCursor cursor = null; try { cursor = dbHandler.generateCursorFrom(startAfterCSN); if (cursor != null && cursor.getChange() != null) { return cursor.getChange().getCSN(); } return null; } catch (ChangelogException e) { // there's no change older than startAfterCSN return new CSN(0, 0, serverId); } finally { close(cursor); } } /** {@inheritDoc} */ @Override public ChangeNumberIndexDB newChangeNumberIndexDB() throws ChangelogException { return new DraftCNDbHandler(replicationServer, this.dbEnv); } /** {@inheritDoc} */ @Override public void setReplicationDBDirectory(String dbDirName) throws ConfigException { if (dbDirName == null) { dbDirName = "changelogDb"; } this.dbDirName = dbDirName; // Check that this path exists or create it. dbDirectory = getFileForPath(this.dbDirName); try { if (!dbDirectory.exists()) { dbDirectory.mkdir(); } } catch (Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(e.getLocalizedMessage()); mb.append(" "); mb.append(String.valueOf(dbDirectory)); Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()); throw new ConfigException(msg, e); } } /** {@inheritDoc} */ @Override public String getDBDirName() { return this.dbDirName; } /** {@inheritDoc} */ @Override public ReplicaDBCursor getCursorFrom(String baseDn, int serverId, CSN startAfterCSN) { DbHandler dbHandler = getDbHandler(baseDn, serverId); if (dbHandler == null) { return null; } ReplicaDBCursor it; try { it = dbHandler.generateCursorFrom(startAfterCSN); } catch (Exception e) { return null; } if (!it.next()) { close(it); return null; } return it; } /** {@inheritDoc} */ @Override public boolean publishUpdateMsg(String baseDn, int serverId, UpdateMsg updateMsg) throws ChangelogException { final Pair<DbHandler, Boolean> pair = getOrCreateDbHandler(baseDn, serverId, replicationServer); final DbHandler dbHandler = pair.getFirst(); final boolean wasCreated = pair.getSecond(); dbHandler.add(updateMsg); return wasCreated; } } opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -34,6 +34,7 @@ import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangelogException; @@ -166,12 +167,14 @@ } /** * Read the list of known servers from the database and start dbHandler * for each of them. * Read the list of known servers from the database and start dbHandler for * each of them. * * @throws ChangelogException in case of underlying Exception * @return the {@link ChangelogState} read from the changelogState DB * @throws ChangelogException * if a database problem occurs */ public void initializeFromChangelogStateDB() throws ChangelogException public ChangelogState readChangelogState() throws ChangelogException { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); @@ -179,6 +182,8 @@ try { final ChangelogState result = new ChangelogState(); OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); while (status == OperationStatus.SUCCESS) { @@ -197,7 +202,7 @@ if (debugEnabled()) debug("has read baseDn=" + baseDn + " generationId=" +generationId); replicationServer.initDomainGenerationID(baseDn, generationId); result.setDomainGenerationId(baseDn, generationId); } else { @@ -207,11 +212,13 @@ if (debugEnabled()) debug("has read: baseDn=" + baseDn + " serverId=" + serverId); replicationServer.addServerIdToDomain(serverId, baseDn); result.addServerIdToDomain(serverId, baseDn); } status = cursor.getNext(key, data, LockMode.DEFAULT); } return result; } catch (RuntimeException e) { opends/src/server/org/opends/server/util/Pair.java
New file @@ -0,0 +1,162 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2013 ForgeRock AS */ package org.opends.server.util; /** * Ordered pair of various objects. * * @param <F> * type of the first pair element * @param <S> * type of the second pair element */ public class Pair<F, S> { /** An empty Pair. */ public static final Pair<?, ?> EMPTY = Pair.of(null, null); /** The first pair element. */ private final F first; /** The second pair element. */ private final S second; /** * Default ctor. * * @param first * the first element of the constructed pair * @param second * the second element of the constructed pair */ private Pair(F first, S second) { this.first = first; this.second = second; } /** * Factory method to build a new Pair. * * @param first * the first element of the constructed pair * @param second * the second element of the constructed pair * @param <F> * type of the first pair element * @param <S> * type of the second pair element * @return A new Pair built with the provided elements */ public static <F, S> Pair<F, S> of(F first, S second) { return new Pair<F, S>(first, second); } /** * Returns an empty Pair matching the required types. * * @param <F> * type of the first pair element * @param <S> * type of the second pair element * @return An empty Pair matching the required types */ @SuppressWarnings("unchecked") public static <F, S> Pair<F, S> empty() { return (Pair<F, S>) EMPTY; } /** * Returns the first element of this pair. * * @return the first element of this pair */ public F getFirst() { return first; } /** * Returns the second element of this pair. * * @return the second element of this pair */ public S getSecond() { return second; } /** {@inheritDoc} */ @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((first == null) ? 0 : first.hashCode()); result = prime * result + ((second == null) ? 0 : second.hashCode()); return result; } /** {@inheritDoc} */ @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Pair<?, ?> other = (Pair<?, ?>) obj; if (first == null) { if (other.first != null) return false; } else if (!first.equals(other.first)) return false; if (second == null) { if (other.second != null) return false; } else if (!second.equals(other.second)) return false; return true; } /** {@inheritDoc} */ @Override public String toString() { return "Pair [" + first + ", " + second + "]"; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java @@ -25,7 +25,7 @@ * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication; package org.opends.server.replication.server; import java.io.*; import java.net.Socket; @@ -44,6 +44,7 @@ import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.*; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.CSNGenerator; import org.opends.server.replication.common.MultiDomainServerState; @@ -53,7 +54,6 @@ import org.opends.server.replication.plugin.LDAPReplicationDomain; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.je.DraftCNDbHandler; @@ -828,14 +828,12 @@ publishDeleteMsgInOTest(s2test, csn9, tn, 9); sleep(500); ReplicationServerDomain rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING); ServerState startState = rsd.getStartState(); ServerState startState = getReplicationDomainStartState(TEST_ROOT_DN_STRING); assertEquals(startState.getCSN(s1test.getServerId()).getSeqnum(), 1); assertTrue(startState.getCSN(s2test.getServerId()) != null); assertEquals(startState.getCSN(s2test.getServerId()).getSeqnum(), 7); rsd = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2); startState = rsd.getStartState(); startState = getReplicationDomainStartState(TEST_ROOT_DN_STRING2); assertEquals(startState.getCSN(s2test2.getServerId()).getSeqnum(), 2); assertEquals(startState.getCSN(s1test2.getServerId()).getSeqnum(), 6); @@ -891,6 +889,11 @@ debugInfo(tn, "Ending test successfully"); } private ServerState getReplicationDomainStartState(String baseDn) { return replicationServer.getReplicationServerDomain(baseDn).getStartState(); } private String getCookie(List<SearchResultEntry> entries, int expectedNbEntries, String tn, LDIFWriter ldifWriter, String cookie) throws Exception @@ -979,8 +982,6 @@ debugInfo(tn, "Starting test"); ReplicationBroker server01 = null; ReplicationServerDomain d1 = null; ReplicationServerDomain d2 = null; try { @@ -1010,10 +1011,7 @@ // --- // 2. Now set up a very short purge delay on the replication changelogs // so that this test can play with a trimmed changelog. d1 = replicationServer.getReplicationServerDomain("o=test"); d2 = replicationServer.getReplicationServerDomain("o=test2"); d1.setPurgeDelay(1); d2.setPurgeDelay(1); replicationServer.getChangelogDB().setPurgeDelay(1); // Sleep longer than this delay - so that the changelog is trimmed Thread.sleep(1000); @@ -1047,8 +1045,8 @@ // returns the appropriate error. publishDeleteMsgInOTest(server01, csns[3], tn, 1); debugInfo(tn, "d1 trimdate" + d1.getStartState()); debugInfo(tn, "d2 trimdate" + d2.getStartState()); debugInfo(tn, "d1 trimdate" + getReplicationDomainStartState("o=test")); debugInfo(tn, "d2 trimdate" + getReplicationDomainStartState("o=test2")); searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, tn, UNWILLING_TO_PERFORM); assertEquals(searchOp.getSearchEntries().size(), 0); assertTrue(searchOp.getErrorMessage().toString().startsWith( @@ -1059,15 +1057,7 @@ { stop(server01); // And reset changelog purge delay for the other tests. if (d1 != null) { d1.setPurgeDelay(15 * 1000); } if (d2 != null) { d2.setPurgeDelay(15 * 1000); } replicationServer.getChangelogDB().setPurgeDelay(15 * 1000); replicationServer.clearDb(); } debugInfo(tn, "Ending test successfully");