| | |
| | | change %s to replicaDB %s %s because flushing thread is shutting down |
| | | SEVERE_ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH_241=Error when retrieving changelog \ |
| | | state from root path '%s' : directory might not exist |
| | | SEVERE_ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND_242=Error when retrieving \ |
| | | changelog state from root path '%s' : no generation id file found in domain \ |
| | | directory '%s' |
| | | SEVERE_ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY_243=Error when retrieving \ |
| | | changelog state from root path '%s' : IO error on domain directory '%s' when retrieving \ |
| | | list of server ids |
| | |
| | | head log file from '%s' to '%s' |
| | | INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \ |
| | | size of head log file is %d bytes |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH_277=Could not add replica \ |
| | | offline for domain %s and server id %d because the path '%s' does not exist |
| | | SEVERE_ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE_278=Could not write offline \ |
| | | replica information for domain %s and server id %d, using path '%s' (offline CSN is %s) |
| | | SEVERE_ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE_279=Could not read replica offline \ |
| | |
| | | addOperation.setAttachment(SYNCHROCONTEXT, ctx); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void publishReplicaOfflineMsg() |
| | | { |
| | | pendingChanges.putReplicaOfflineMsg(); |
| | | } |
| | | |
| | | /** |
| | | * Check if an operation must be synchronized. |
| | | * Also update the list of pending changes and the server RUV |
| | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.LDAPUpdateMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.types.operation.PluginOperation; |
| | | |
| | | /** |
| | |
| | | { |
| | | private final CSN csn; |
| | | private boolean committed; |
| | | private LDAPUpdateMsg msg; |
| | | private UpdateMsg msg; |
| | | private final PluginOperation op; |
| | | private ServerState dependencyState; |
| | | |
| | |
| | | * @param op the operation to use |
| | | * @param msg the message to use (can be null for local operations) |
| | | */ |
| | | PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg) |
| | | PendingChange(CSN csn, PluginOperation op, UpdateMsg msg) |
| | | { |
| | | this.csn = csn; |
| | | this.committed = false; |
| | |
| | | * @return the message if operation was a replication operation |
| | | * null if the operation was a local operation |
| | | */ |
| | | public LDAPUpdateMsg getMsg() |
| | | public UpdateMsg getMsg() |
| | | { |
| | | return msg; |
| | | } |
| | | |
| | | /** |
| | | * Get the LDAPUpdateMsg associated to this PendingChange. |
| | | * |
| | | * @return the LDAPUpdateMsg if operation was a replication operation, null |
| | | * otherwise |
| | | */ |
| | | public LDAPUpdateMsg getLDAPUpdateMsg() |
| | | { |
| | | if (msg instanceof LDAPUpdateMsg) |
| | | { |
| | | return (LDAPUpdateMsg) msg; |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Set the message associated to the PendingChange. |
| | | * @param msg the message |
| | | */ |
| | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.CSNGenerator; |
| | | import org.opends.server.replication.protocol.LDAPUpdateMsg; |
| | | import org.opends.server.replication.protocol.ReplicaOfflineMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.service.ReplicationDomain; |
| | | import org.opends.server.types.operation.PluginOperation; |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Add a replica offline message to the pending list. |
| | | */ |
| | | public synchronized void putReplicaOfflineMsg() |
| | | { |
| | | final CSN offlineCSN = csnGenerator.newCSN(); |
| | | final PendingChange pendingChange = |
| | | new PendingChange(offlineCSN, null, new ReplicaOfflineMsg(offlineCSN)); |
| | | pendingChange.setCommitted(true); |
| | | |
| | | pendingChanges.put(offlineCSN, pendingChange); |
| | | pushCommittedChanges(); |
| | | } |
| | | |
| | | /** |
| | | * Push all committed local changes to the replicationServer service. |
| | | */ |
| | | synchronized void pushCommittedChanges() |
| | |
| | | while (firstChange != null && firstChange.isCommitted()) |
| | | { |
| | | final PluginOperation op = firstChange.getOp(); |
| | | if (op != null && !op.isSynchronizationOperation()) |
| | | final UpdateMsg msg = firstChange.getMsg(); |
| | | if (msg instanceof LDAPUpdateMsg |
| | | && op != null |
| | | && !op.isSynchronizationOperation()) |
| | | { |
| | | final LDAPUpdateMsg updateMsg = firstChange.getMsg(); |
| | | if (!recoveringOldChanges) |
| | | { |
| | | domain.publish(updateMsg); |
| | | domain.publish(msg); |
| | | } |
| | | else |
| | | { |
| | | // do not push updates until the RS catches up. |
| | | // @see #setRecovering(boolean) |
| | | domain.getServerState().update(updateMsg.getCSN()); |
| | | domain.getServerState().update(msg.getCSN()); |
| | | } |
| | | } |
| | | else if (msg instanceof ReplicaOfflineMsg) |
| | | { |
| | | domain.publish(msg); |
| | | } |
| | | |
| | | // false warning: firstEntry will not be null if firstChange is not null |
| | | pendingChanges.remove(firstEntry.getKey()); |
| | |
| | | if (change.dependenciesIsCovered(state)) |
| | | { |
| | | dependentChanges.remove(change); |
| | | return change.getMsg(); |
| | | return change.getLDAPUpdateMsg(); |
| | | } |
| | | } |
| | | return null; |
| | |
| | | { |
| | | if (pendingChange.getCSN().isOlderThan(csn)) |
| | | { |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); |
| | | if (pendingMsg != null) |
| | | { |
| | | if (pendingMsg instanceof DeleteMsg) |
| | |
| | | { |
| | | if (pendingChange.getCSN().isOlderThan(csn)) |
| | | { |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); |
| | | if (pendingMsg instanceof AddMsg) |
| | | { |
| | | // Check if the operation to be run is an addOperation on a same DN. |
| | |
| | | return false; |
| | | } |
| | | |
| | | final DN targetDN = change.getMsg().getDN(); |
| | | final DN targetDN = change.getLDAPUpdateMsg().getDN(); |
| | | |
| | | for (PendingChange pendingChange : pendingChanges.values()) |
| | | { |
| | | if (pendingChange.getCSN().isOlderThan(csn)) |
| | | { |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); |
| | | if (pendingMsg != null) |
| | | { |
| | | if (pendingMsg instanceof DeleteMsg) |
| | |
| | | { |
| | | if (pendingChange.getCSN().isOlderThan(csn)) |
| | | { |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); |
| | | final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); |
| | | if (pendingMsg != null) |
| | | { |
| | | if (pendingMsg instanceof DeleteMsg) |
| | |
| | | /** |
| | | * The constant for the 8th version of the replication protocol. |
| | | * <ul> |
| | | * <li>StopMsg now has a timestamp to communicate the replica stop time.</li> |
| | | * <li>New ReplicaOfflineMsg.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V8 = 8; |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | |
| | | import static org.opends.server.replication.protocol.ByteArrayBuilder.*; |
| | | |
| | | /** |
| | | * Class that define messages sent by a replica (DS) to the replication server |
| | | * (RS) to let the RS know the date at which a replica went offline. |
| | | */ |
| | | public class ReplicaOfflineMsg extends UpdateMsg |
| | | { |
| | | |
| | | /** |
| | | * Constructor of a replica offline message providing the offline timestamp in |
| | | * a CSN. |
| | | * |
| | | * @param offlineCSN |
| | | * the provided offline CSN |
| | | */ |
| | | public ReplicaOfflineMsg(final CSN offlineCSN) |
| | | { |
| | | super(offlineCSN, new byte[0]); |
| | | } |
| | | |
| | | /** |
| | | * Creates a message by deserializing it from the provided byte array. |
| | | * |
| | | * @param in |
| | | * The provided byte array. |
| | | * @throws DataFormatException |
| | | * When an error occurs during decoding . |
| | | */ |
| | | public ReplicaOfflineMsg(byte[] in) throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | final ByteArrayScanner scanner = new ByteArrayScanner(in); |
| | | final byte msgType = scanner.nextByte(); |
| | | if (msgType != MSG_TYPE_REPLICA_OFFLINE) |
| | | { |
| | | throw new DataFormatException("input is not a valid " |
| | | + getClass().getSimpleName() + " message: " + msgType); |
| | | } |
| | | protocolVersion = scanner.nextShort(); |
| | | csn = scanner.nextCSN(); |
| | | |
| | | if (!scanner.isEmpty()) |
| | | { |
| | | throw new DataFormatException( |
| | | "Did not expect to find more bytes to read for " |
| | | + getClass().getSimpleName()); |
| | | } |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | // Index out of bounds, bad format, etc. |
| | | throw new DataFormatException("byte[] is not a valid " |
| | | + getClass().getSimpleName()); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public byte[] getBytes(short protocolVersion) |
| | | { |
| | | final ByteArrayBuilder builder = new ByteArrayBuilder(size()); |
| | | builder.appendByte(MSG_TYPE_REPLICA_OFFLINE); |
| | | builder.appendShort(protocolVersion); |
| | | builder.appendCSN(csn); |
| | | return builder.toByteArray(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int size() |
| | | { |
| | | return bytes(1) + shorts(1) + csns(1); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " offlineCSN=" + csn; |
| | | } |
| | | } |
| | |
| | | // EntryMsg, InitializeRequestMsg, InitializeTargetMsg, ErrorMsg |
| | | // TopologyMsg |
| | | |
| | | /** @since {@link ProtocolVersion#REPLICATION_PROTOCOL_V8} */ |
| | | static final byte MSG_TYPE_REPLICA_OFFLINE = 37; |
| | | |
| | | // Adding a new type of message here probably requires to |
| | | // change accordingly generateMsg method below |
| | | |
| | |
| | | return new StopMsg(buffer); |
| | | case MSG_TYPE_INITIALIZE_RCV_ACK: |
| | | return new InitializeRcvAckMsg(buffer); |
| | | case MSG_TYPE_REPLICA_OFFLINE: |
| | | return new ReplicaOfflineMsg(buffer); |
| | | default: |
| | | throw new DataFormatException("received message with unknown type"); |
| | | } |
| | |
| | | { |
| | | final UpdateMsg newMsg = mh.getNextMessage(false /* non blocking */); |
| | | |
| | | if (newMsg == null) |
| | | if (newMsg instanceof ReplicaOfflineMsg) |
| | | { |
| | | // and ReplicaOfflineMsg cannot be returned to a search on cn=changelog |
| | | // proceed as if it was never returned |
| | | continue; |
| | | } |
| | | else if (newMsg == null) |
| | | { // in non blocking mode, null means no more messages |
| | | return null; |
| | | } |
| | |
| | | { |
| | | try |
| | | { |
| | | if (updateMsg instanceof ReplicaOfflineMsg) |
| | | { |
| | | final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg; |
| | | this.domainDB.notifyReplicaOffline(baseDN, offlineMsg.getCSN()); |
| | | return true; |
| | | } |
| | | |
| | | if (this.domainDB.publishUpdateMsg(baseDN, updateMsg)) |
| | | { |
| | | /* |
| | |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer; |
| | | import org.opends.server.replication.server.changelog.je.CompositeDBCursor; |
| | | import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugError("clear the FileChangelogDB"); |
| | | TRACER.debugInfo("clear the FileChangelogDB"); |
| | | } |
| | | if (!dbDirectory.exists()) |
| | | { |
| | |
| | | throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final ChangelogState state = replicationEnv.readChangelogState(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | } |
| | | // recycle exhausted cursors, |
| | | // because client code will not manage the cursors itself |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | { |
| | | final List<CSN> domain = state.getOfflineReplicas().get(baseDN); |
| | | if (domain != null) |
| | | { |
| | | for (CSN offlineCSN : domain) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |
| | |
| | | */ |
| | | void clearGenerationId(final DN domainDN) throws ChangelogException |
| | | { |
| | | synchronized(domainLock) |
| | | synchronized (domainLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknow domain => no-op |
| | | } |
| | | final File idFile = retrieveGenerationIdFile(getDomainPath(domainId)); |
| | | if (idFile != null) |
| | | { |
| | |
| | | { |
| | | clearGenerationId(baseDN); |
| | | final String domainId = domains.get(baseDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknow domain => no-op |
| | | } |
| | | final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID); |
| | | ensureGenerationIdFileExists(generationIdPath); |
| | | } |
| | |
| | | synchronized (domainLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknow domain => no-op |
| | | } |
| | | final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId()); |
| | | if (!serverIdPath.exists()) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_REPLICA_OFFLINE_WRONG_PATH.get( |
| | | domainDN.toString(), offlineCSN.getServerId(), serverIdPath.getPath())); |
| | | return; // no serverId anymore => no-op |
| | | } |
| | | final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME); |
| | | Writer writer = null; |
| | |
| | | synchronized (domainLock) |
| | | { |
| | | final String domainId = domains.get(domainDN); |
| | | if (domainId == null) |
| | | { |
| | | return; // unknow domain => no-op |
| | | } |
| | | final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME); |
| | | if (offlineFile.exists()) |
| | | { |
| | |
| | | throws ChangelogException |
| | | { |
| | | final File domainDirectory = getDomainPath(domainEntry.getValue()); |
| | | final String generationId = retrieveGenerationId(domainDirectory); |
| | | if (generationId == null) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND.get( |
| | | replicationRootPath, domainDirectory.getPath())); |
| | | } |
| | | final DN domainDN = domainEntry.getKey(); |
| | | state.setDomainGenerationId(domainDN, toGenerationId(generationId)); |
| | | final String generationId = retrieveGenerationId(domainDirectory); |
| | | if (generationId != null) |
| | | { |
| | | state.setDomainGenerationId(domainDN, toGenerationId(generationId)); |
| | | } |
| | | |
| | | final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER); |
| | | if (serverIds == null) |
| | |
| | | throws ChangelogException |
| | | { |
| | | final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); |
| | | final ChangelogState state = dbEnv.readChangelogState(); |
| | | final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); |
| | | for (int serverId : serverIds) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; |
| | | cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); |
| | | final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); |
| | | final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState); |
| | | cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); |
| | | } |
| | | // recycle exhausted cursors, |
| | | // because client code will not manage the cursors itself |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | } |
| | | |
| | | private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId, |
| | | ServerState startAfterServerState) |
| | | { |
| | | final List<CSN> domain = state.getOfflineReplicas().get(baseDN); |
| | | if (domain != null) |
| | | { |
| | | for (CSN offlineCSN : domain) |
| | | { |
| | | if (serverId == offlineCSN.getServerId() |
| | | && !startAfterServerState.cover(offlineCSN)) |
| | | { |
| | | return offlineCSN; |
| | | } |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.ReplicaOfflineMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | |
| | | /** |
| | | * Implementation of a DBCursor that decorates an existing DBCursor |
| | | * and returns a ReplicaOfflineMsg when the decorated DBCursor is exhausted |
| | | * and the offline CSN is newer than the last returned update CSN. |
| | | */ |
| | | public class ReplicaOfflineCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | /** @NonNull */ |
| | | private final DBCursor<UpdateMsg> cursor; |
| | | private ReplicaOfflineMsg replicaOfflineMsg; |
| | | /** |
| | | * Whether calls to {@link #getRecord()} must return the {@link ReplicaOfflineMsg} |
| | | */ |
| | | private boolean returnReplicaOfflineMsg; |
| | | |
| | | /** |
| | | * Creates a ReplicaOfflineCursor object with a cursor to decorate |
| | | * and an offlineCSN to return as part of a ReplicaOfflineMsg. |
| | | * |
| | | * @param cursor |
| | | * the non-null underlying cursor that needs to be exhausted before |
| | | * we return a ReplicaOfflineMsg |
| | | * @param offlineCSN |
| | | * The offline CSN from which to builder the |
| | | * {@link ReplicaOfflineMsg} to return |
| | | */ |
| | | public ReplicaOfflineCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN) |
| | | { |
| | | this.replicaOfflineMsg = |
| | | offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null; |
| | | this.cursor = cursor; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return returnReplicaOfflineMsg ? replicaOfflineMsg : cursor.getRecord(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | if (returnReplicaOfflineMsg) |
| | | { |
| | | // already consumed, never return it again... |
| | | replicaOfflineMsg = null; |
| | | returnReplicaOfflineMsg = false; |
| | | // ...and verify if new changes have been added to the DB |
| | | // (cursors are automatically restarted) |
| | | } |
| | | final UpdateMsg lastUpdate = cursor.getRecord(); |
| | | final boolean hasNext = cursor.next(); |
| | | if (hasNext) |
| | | { |
| | | return true; |
| | | } |
| | | if (replicaOfflineMsg == null) |
| | | { // no ReplicaOfflineMsg to return |
| | | return false; |
| | | } |
| | | |
| | | // replicaDB just happened to be exhausted now |
| | | if (lastUpdate != null |
| | | && replicaOfflineMsg.getCSN().isOlderThanOrEqualTo(lastUpdate.getCSN())) |
| | | { |
| | | // offlineCSN is outdated, never return it |
| | | replicaOfflineMsg = null; |
| | | return false; |
| | | } |
| | | returnReplicaOfflineMsg = true; |
| | | return true; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | cursor.close(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() |
| | | + " returnReplicaOfflineMsg=" + returnReplicaOfflineMsg |
| | | + " offlineCSN=" |
| | | + (replicaOfflineMsg != null ? replicaOfflineMsg.getCSN().toStringUI() : null) |
| | | + " cursor=" + cursor.toString().split("", 2)[1]; |
| | | } |
| | | |
| | | } |
| | |
| | | |
| | | synchronized (startStopLock) |
| | | { |
| | | domain.publishReplicaOfflineMsg(); |
| | | shutdown = true; |
| | | setConnectedRS(ConnectedRS.stopped()); |
| | | stopRSHeartBeatMonitoring(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Publishes a replica offline message if all pending changes for current |
| | | * replica have been sent out. |
| | | */ |
| | | public void publishReplicaOfflineMsg() |
| | | { |
| | | // Here to be overridden |
| | | } |
| | | |
| | | /** |
| | | * Publish information to the Replication Service (not assured mode). |
| | | * |
| | | * @param msg The byte array containing the information that should |
| | |
| | | assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN()); |
| | | } |
| | | |
| | | @Test |
| | | public void replicaOfflineMsgTest() throws Exception |
| | | { |
| | | final CSN csn = new CSN(System.currentTimeMillis(), 0, 42); |
| | | final ReplicaOfflineMsg expectedMsg = new ReplicaOfflineMsg(csn); |
| | | |
| | | final byte[] bytes = expectedMsg.getBytes(REPLICATION_PROTOCOL_V8); |
| | | ReplicaOfflineMsg decodedMsg = new ReplicaOfflineMsg(bytes); |
| | | assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN()); |
| | | } |
| | | |
| | | /** |
| | | * Test that WindowMsg encoding and decoding works |
| | | * by checking that : msg == new WindowMsg(msg.getBytes()). |
| | |
| | | import java.util.Map; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | |
| | | @BeforeClass |
| | | public void setupMsgs() |
| | | { |
| | | msg1 = newUpdateMsg(1); |
| | | msg2 = newUpdateMsg(2); |
| | | msg3 = newUpdateMsg(3); |
| | | msg4 = newUpdateMsg(4); |
| | | msg1 = new FakeUpdateMsg(1); |
| | | msg2 = new FakeUpdateMsg(2); |
| | | msg3 = new FakeUpdateMsg(3); |
| | | msg4 = new FakeUpdateMsg(4); |
| | | } |
| | | |
| | | @Test |
| | |
| | | of(msg4, baseDN1)); |
| | | } |
| | | |
| | | private UpdateMsg newUpdateMsg(final int t) |
| | | { |
| | | return new UpdateMsg(new CSN(t, t, t), new byte[t]) |
| | | { |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "UpdateMsg(" + t + ")"; |
| | | } |
| | | }; |
| | | } |
| | | |
| | | private CompositeDBCursor<String> newCompositeDBCursor( |
| | | Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception |
| | | { |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | class FakeUpdateMsg extends UpdateMsg |
| | | { |
| | | private final int t; |
| | | |
| | | FakeUpdateMsg(int t) |
| | | { |
| | | super(new CSN(t, t, t), new byte[1]); |
| | | this.t = t; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "UpdateMsg(" + t + ")"; |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.ReplicaOfflineMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.testng.annotations.BeforeTest; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | |
| | | /** |
| | | * Test the ReplicaOfflineCursor class. |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | public class ReplicaOfflineCursorTest extends ReplicationTestCase |
| | | { |
| | | |
| | | private int timestamp; |
| | | private DBCursor<UpdateMsg> delegateCursor; |
| | | |
| | | @BeforeTest |
| | | public void init() |
| | | { |
| | | timestamp = 1; |
| | | } |
| | | |
| | | @Test |
| | | public void cursorReturnsFalse() throws Exception |
| | | { |
| | | delegateCursor = new SequentialDBCursor(); |
| | | |
| | | final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThat(cursor.next()).isFalse(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | } |
| | | |
| | | @Test |
| | | public void cursorReturnsTrue() throws Exception |
| | | { |
| | | final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++); |
| | | delegateCursor = new SequentialDBCursor(null, updateMsg); |
| | | |
| | | final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThat(cursor.next()).isTrue(); |
| | | assertThat(cursor.getRecord()).isSameAs(updateMsg); |
| | | assertThat(cursor.next()).isFalse(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | } |
| | | |
| | | @Test |
| | | public void cursorReturnsReplicaOfflineMsg() throws Exception |
| | | { |
| | | delegateCursor = new SequentialDBCursor(); |
| | | |
| | | final CSN offlineCSN = new CSN(timestamp++, 1, 1); |
| | | final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThat(cursor.next()).isTrue(); |
| | | final UpdateMsg record = cursor.getRecord(); |
| | | assertThat(record).isInstanceOf(ReplicaOfflineMsg.class); |
| | | assertThat(record.getCSN()).isEqualTo(offlineCSN); |
| | | assertThat(cursor.next()).isFalse(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | } |
| | | |
| | | @Test |
| | | public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception |
| | | { |
| | | final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++); |
| | | delegateCursor = new SequentialDBCursor(null, updateMsg); |
| | | |
| | | final CSN offlineCSN = new CSN(timestamp++, 1, 1); |
| | | final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThat(cursor.next()).isTrue(); |
| | | assertThat(cursor.getRecord()).isSameAs(updateMsg); |
| | | assertThat(cursor.next()).isTrue(); |
| | | final UpdateMsg record = cursor.getRecord(); |
| | | assertThat(record).isInstanceOf(ReplicaOfflineMsg.class); |
| | | assertThat(record.getCSN()).isEqualTo(offlineCSN); |
| | | assertThat(cursor.next()).isFalse(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | } |
| | | |
| | | @Test |
| | | public void cursorReturnsUpdateMsgThenNeverReturnsOutdatedReplicaOfflineMsg() throws Exception |
| | | { |
| | | final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1); |
| | | |
| | | final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++); |
| | | delegateCursor = new SequentialDBCursor(null, updateMsg); |
| | | |
| | | final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThat(cursor.next()).isTrue(); |
| | | assertThat(cursor.getRecord()).isSameAs(updateMsg); |
| | | assertThat(cursor.next()).isFalse(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | } |
| | | |
| | | } |