| | |
| | | import java.io.IOException; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; |
| | | import org.opends.server.replication.server.changelog.je.DbHandler; |
| | | import org.opends.server.types.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | private final Queue<MessageHandler> otherHandlers = |
| | | new ConcurrentLinkedQueue<MessageHandler>(); |
| | | |
| | | /** |
| | | * This map contains the List of updates received from each LDAP server. |
| | | */ |
| | | private final Map<Integer, DbHandler> sourceDbHandlers = |
| | | new ConcurrentHashMap<Integer, DbHandler>(); |
| | | private final ChangelogDB changelogDB; |
| | | /** The ReplicationServer that created the current instance. */ |
| | | private ReplicationServer localReplicationServer; |
| | | |
| | | /** GenerationId management. */ |
| | | /** |
| | | * The generationId of the current replication domain. The generationId is |
| | | * computed by hashing the first 1000 entries in the DB. |
| | | */ |
| | | private volatile long generationId = -1; |
| | | private boolean generationIdSavedStatus = false; |
| | | /** |
| | | * JNR, this is legacy code, hard to follow logic. I think what this field |
| | | * tries to say is: "is the generationId in use anywhere?", i.e. is there a |
| | | * replication topology in place? As soon as an answer to any of these |
| | | * question comes true, then it is set to true. |
| | | * <p> |
| | | * It looks like the only use of this field is to prevent the |
| | | * {@link #generationId} from being reset by |
| | | * {@link #resetGenerationIdIfPossible()}. |
| | | */ |
| | | private volatile boolean generationIdSavedStatus = false; |
| | | |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | |
| | | this.assuredTimeoutTimer = new Timer("Replication server RS(" |
| | | + localReplicationServer.getServerId() |
| | | + ") assured timer for domain \"" + baseDn + "\"", true); |
| | | this.changelogDB = localReplicationServer.getChangelogDB(); |
| | | |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | if (!publishMessage(update, serverId)) |
| | | if (!publishUpdateMsg(update, serverId)) |
| | | { |
| | | return; |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean publishMessage(UpdateMsg update, int serverId) |
| | | private boolean publishUpdateMsg(UpdateMsg updateMsg, int serverId) |
| | | { |
| | | // look for the dbHandler that is responsible for the LDAP server which |
| | | // generated the change. |
| | | DbHandler dbHandler; |
| | | synchronized (sourceDbHandlers) |
| | | try |
| | | { |
| | | dbHandler = sourceDbHandlers.get(serverId); |
| | | if (dbHandler == null) |
| | | if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg)) |
| | | { |
| | | try |
| | | { |
| | | dbHandler = localReplicationServer.newDbHandler(serverId, baseDn); |
| | | generationIdSavedStatus = true; |
| | | } catch (ChangelogException e) |
| | | /* |
| | | * JNR: Matt and I had a hard time figuring out where to put this |
| | | * synchronized block. We elected to put it here, but without a strong |
| | | * conviction. |
| | | */ |
| | | synchronized (generationIDLock) |
| | | { |
| | | /* |
| | | * Because of database problem we can't save any more changes |
| | | * from at least one LDAP server. |
| | | * This replicationServer therefore can't do it's job properly anymore |
| | | * and needs to close all its connections and shutdown itself. |
| | | * JNR: I think the generationIdSavedStatus is set to true because |
| | | * method above created a ReplicaDB which assumes the generationId was |
| | | * communicated to another server. Hence setting true on this field |
| | | * prevent the generationId from being reset. |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(" "); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | localReplicationServer.shutdown(); |
| | | return false; |
| | | generationIdSavedStatus = true; |
| | | } |
| | | sourceDbHandlers.put(serverId, dbHandler); |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | // Publish the messages to the source handler |
| | | dbHandler.add(update); |
| | | return true; |
| | | catch (ChangelogException e) |
| | | { |
| | | /* |
| | | * Because of database problem we can't save any more changes from at |
| | | * least one LDAP server. This replicationServer therefore can't do it's |
| | | * job properly anymore and needs to close all its connections and |
| | | * shutdown itself. |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); |
| | | mb.append(" "); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | localReplicationServer.shutdown(); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | private NotAssuredUpdateMsg addUpdate(ServerHandler sHandler, |
| | |
| | | */ |
| | | public Set<Integer> getServerIds() |
| | | { |
| | | return sourceDbHandlers.keySet(); |
| | | return changelogDB.getDomainServerIds(baseDn); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN) |
| | | { |
| | | DbHandler dbHandler = sourceDbHandlers.get(serverId); |
| | | if (dbHandler == null) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | ReplicaDBCursor cursor; |
| | | try |
| | | { |
| | | cursor = dbHandler.generateCursorFrom(startAfterCSN); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | if (!cursor.next()) |
| | | { |
| | | close(cursor); |
| | | return null; |
| | | } |
| | | |
| | | return cursor; |
| | | return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getCount(int serverId, CSN from, CSN to) |
| | | { |
| | | DbHandler dbHandler = sourceDbHandlers.get(serverId); |
| | | if (dbHandler != null) |
| | | { |
| | | return dbHandler.getCount(from, to); |
| | | } |
| | | return 0; |
| | | return changelogDB.getCount(baseDn, serverId, from, to); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getChangesCount() |
| | | { |
| | | long entryCount = 0; |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | | { |
| | | entryCount += dbHandler.getChangesCount(); |
| | | } |
| | | return entryCount; |
| | | return changelogDB.getDomainChangesCount(baseDn); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Sets the provided DbHandler associated to the provided serverId. |
| | | * |
| | | * @param serverId the serverId for the server to which is |
| | | * associated the DbHandler. |
| | | * @param dbHandler the dbHandler associated to the serverId. |
| | | * |
| | | * @throws ChangelogException If a database error happened. |
| | | */ |
| | | public void setDbHandler(int serverId, DbHandler dbHandler) |
| | | throws ChangelogException |
| | | { |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | sourceDbHandlers.put(serverId, dbHandler); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the destination handlers for a routable message. |
| | | * |
| | | * @param msg The message to route. |
| | |
| | | |
| | | stopAllServers(true); |
| | | |
| | | shutdownDbHandlers(); |
| | | } |
| | | |
| | | /** Shutdown all the dbHandlers. */ |
| | | private void shutdownDbHandlers() |
| | | { |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | | { |
| | | dbHandler.shutdown(); |
| | | } |
| | | sourceDbHandlers.clear(); |
| | | } |
| | | changelogDB.shutdownDomain(baseDn); |
| | | } |
| | | |
| | | /** |
| | |
| | | public ServerState getDbServerState() |
| | | { |
| | | ServerState serverState = new ServerState(); |
| | | for (DbHandler db : sourceDbHandlers.values()) |
| | | for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values()) |
| | | { |
| | | serverState.update(db.getLastChange()); |
| | | serverState.update(lastCSN); |
| | | } |
| | | return serverState; |
| | | } |
| | |
| | | public void clearDbs() |
| | | { |
| | | // Reset the localchange and state db for the current domain |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | | { |
| | | try |
| | | { |
| | | dbHandler.clear(); |
| | | } catch (Exception e) |
| | | { |
| | | // TODO: i18n |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), |
| | | e.getMessage() + " " + stackTraceToSingleLineString(e))); |
| | | logError(mb.toMessage()); |
| | | } |
| | | } |
| | | shutdownDbHandlers(); |
| | | } |
| | | changelogDB.clearDomain(baseDn); |
| | | try |
| | | { |
| | | localReplicationServer.clearGenerationId(baseDn); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set the purge delay on all the db Handlers for this Domain |
| | | * of Replication. |
| | | * |
| | | * @param delay The new purge delay to use. |
| | | */ |
| | | public void setPurgeDelay(long delay) |
| | | { |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | | { |
| | | dbHandler.setPurgeDelay(delay); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the map of connected DSs. |
| | | * @return The map of connected DSs |
| | | */ |
| | |
| | | { |
| | | for (int serverId : dbState) |
| | | { |
| | | DbHandler h = sourceDbHandlers.get(serverId); |
| | | CSN mostRecentDbCSN = dbState.getCSN(serverId); |
| | | try { |
| | | // Is the most recent change in the Db newer than eligible CSN ? |
| | |
| | | if (eligibleCSN.olderOrEqual(mostRecentDbCSN)) |
| | | { |
| | | // let's try to seek the first change <= eligibleCSN |
| | | ReplicaDBCursor cursor = null; |
| | | try { |
| | | cursor = h.generateCursorFrom(eligibleCSN); |
| | | if (cursor != null && cursor.getChange() != null) { |
| | | CSN newCSN = cursor.getChange().getCSN(); |
| | | result.update(newCSN); |
| | | } |
| | | } catch (ChangelogException e) { |
| | | // there's no change older than eligibleCSN (case of s3/csn31) |
| | | result.update(new CSN(0, 0, serverId)); |
| | | } finally { |
| | | close(cursor); |
| | | } |
| | | CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN); |
| | | result.update(newCSN); |
| | | } else { |
| | | // for this serverId, all changes in the ChangelogDb are holder |
| | | // than eligibleCSN, the most recent in the db is our guy. |
| | |
| | | public ServerState getStartState() |
| | | { |
| | | ServerState domainStartState = new ServerState(); |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | | for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values()) |
| | | { |
| | | domainStartState.update(dbHandler.getFirstChange()); |
| | | domainStartState.update(firstCSN); |
| | | } |
| | | return domainStartState; |
| | | } |
| | |
| | | { |
| | | CSN eligibleCSN = null; |
| | | |
| | | for (DbHandler db : sourceDbHandlers.values()) |
| | | for (Entry<Integer, CSN> entry : |
| | | changelogDB.getDomainLastCSNs(baseDn).entrySet()) |
| | | { |
| | | // Consider this producer (DS/db). |
| | | int serverId = db.getServerId(); |
| | | final int serverId = entry.getKey(); |
| | | final CSN changelogLastCSN = entry.getValue(); |
| | | |
| | | // Should it be considered for eligibility ? |
| | | CSN heartbeatLastCSN = |
| | |
| | | continue; |
| | | } |
| | | |
| | | CSN changelogLastCSN = db.getLastChange(); |
| | | if (changelogLastCSN != null |
| | | && (eligibleCSN == null || changelogLastCSN.newer(eligibleCSN))) |
| | | { |
| | |
| | | */ |
| | | public long getLatestDomainTrimDate() |
| | | { |
| | | long latest = 0; |
| | | for (DbHandler db : sourceDbHandlers.values()) |
| | | { |
| | | if (latest == 0 || latest < db.getLatestTrimDate()) |
| | | { |
| | | latest = db.getLatestTrimDate(); |
| | | } |
| | | } |
| | | return latest; |
| | | return changelogDB.getDomainLatestTrimDate(baseDn); |
| | | } |
| | | |
| | | /** |