OPENDJ-1116 Introduce abstraction for the changelog DB
In replication converted the use of String for baseDN to use actual DNs.
By looking at the code, I am a bit afraid that some paths in the code were doing DN.equals(String) or vice-versa. Problem is that the current changes might change the behaviour of these paths. One example I spotted is in LDAPReplicationDomain.isSolveConflict().
LDAPReplicationDomain.java:
Removed fields serverId and baseDn already held in parent class + Used getServerId(), getBaseDN() and getBaseDNString() + moved getBaseDN() to ReplicationDomain.
ReplicationDomain.java:
Moved getBaseDN() here from LDAPReplicationDomain.
ReplicationServer.java:
In getECLChangeNumberLimits(), removed unnecessary code parsing a String. Now, the code will not throw a DirectoryException when the cookie cannot be parsed. I do not think this is a problem since the result of the parsing was never used anyway.
ReplicationBroker.java:
In performPhaseOneHandshake(), used StaticUtils.close().
In performECLPhaseTwoHandshake(), changed return type to void.
*.java:
Converted baseDNs from String type to DN type.
Renamed a few getBaseDn() to getBaseDN().
*Test.java:
Removed useless try / catch / fail test anti patterns.
Changed some fake DNs to have DN-valid syntaxes.
| | |
| | | */ |
| | | package org.opends.server.replication.common; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | import java.util.Iterator; |
| | | import java.util.Map; |
| | | import java.util.TreeMap; |
| | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | | * This object is used to store a list of ServerState object, one by replication |
| | | * domain. Globally, it is the generalization of ServerState (that applies to |
| | |
| | | * MultiDomainServerState is also known as "cookie" and is used with the |
| | | * cookie-based changelog. |
| | | */ |
| | | public class MultiDomainServerState implements Iterable<String> |
| | | public class MultiDomainServerState implements Iterable<DN> |
| | | { |
| | | /** |
| | | * The list of (domain service id, ServerState). |
| | | */ |
| | | private Map<String, ServerState> list; |
| | | private Map<DN, ServerState> list; |
| | | |
| | | /** |
| | | * Creates a new empty object. |
| | | */ |
| | | public MultiDomainServerState() |
| | | { |
| | | list = new TreeMap<String, ServerState>(); |
| | | list = new TreeMap<DN, ServerState>(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @return a boolean indicating if the update was meaningful. |
| | | */ |
| | | public boolean update(String baseDN, CSN csn) |
| | | public boolean update(DN baseDN, CSN csn) |
| | | { |
| | | if (csn == null) |
| | | return false; |
| | |
| | | * @param baseDN The provided baseDN. |
| | | * @param serverState The provided serverState. |
| | | */ |
| | | public void update(String baseDN, ServerState serverState) |
| | | public void update(DN baseDN, ServerState serverState) |
| | | { |
| | | list.put(baseDN,serverState.duplicate()); |
| | | } |
| | |
| | | String res = ""; |
| | | if ((list != null) && (!list.isEmpty())) |
| | | { |
| | | for (String baseDN : list.keySet()) |
| | | for (DN baseDN : list.keySet()) |
| | | { |
| | | ServerState ss = list.get(baseDN); |
| | | res += baseDN + ":" + ss + ";"; |
| | |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public Iterator<String> iterator() |
| | | public Iterator<DN> iterator() |
| | | { |
| | | return list.keySet().iterator(); |
| | | } |
| | |
| | | */ |
| | | public boolean cover(MultiDomainServerState covered) |
| | | { |
| | | for (String baseDN : covered.list.keySet()) |
| | | for (DN baseDN : covered.list.keySet()) |
| | | { |
| | | ServerState state = list.get(baseDN); |
| | | ServerState coveredState = covered.list.get(baseDN); |
| | |
| | | * @exception DirectoryException when an error occurs |
| | | * @return the split state. |
| | | */ |
| | | public static Map<String,ServerState> splitGenStateToServerStates( |
| | | String multidomainserverstate) |
| | | throws DirectoryException |
| | | public static Map<DN, ServerState> splitGenStateToServerStates( |
| | | String multidomainserverstate) throws DirectoryException |
| | | { |
| | | Map<String, ServerState> startStates = new TreeMap<String, ServerState>(); |
| | | Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>(); |
| | | if ((multidomainserverstate != null) |
| | | && (multidomainserverstate.length() > 0)) |
| | | { |
| | |
| | | serverStateByDomain.update(fromCSN); |
| | | } |
| | | } |
| | | startStates.put(domainBaseDN, serverStateByDomain); |
| | | startStates.put(DN.decode(domainBaseDN), serverStateByDomain); |
| | | } |
| | | } |
| | | catch (DirectoryException de) |
| | |
| | | /** |
| | | * The fully-qualified name of this class. |
| | | */ |
| | | private static final String CLASS_NAME = |
| | | "org.opends.server.replication.plugin.LDAPReplicationDomain"; |
| | | private static final String CLASS_NAME = LDAPReplicationDomain.class |
| | | .getName(); |
| | | |
| | | /** |
| | | * The attribute used to mark conflicting entries. |
| | |
| | | */ |
| | | private final RemotePendingChanges remotePendingChanges; |
| | | |
| | | private final int serverId; |
| | | |
| | | private final DN baseDn; |
| | | |
| | | private volatile boolean shutdown = false; |
| | | |
| | | private final InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | |
| | | private boolean solveConflictFlag = true; |
| | | |
| | | private volatile boolean shutdown = false; |
| | | private volatile boolean disabled = false; |
| | | private volatile boolean stateSavingDisabled = false; |
| | | |
| | |
| | | { |
| | | protected ServerStateFlush() |
| | | { |
| | | super("Replica DS(" + serverId |
| | | + ") state checkpointer for domain \"" + baseDn + "\""); |
| | | super("Replica DS(" + getServerId() |
| | | + ") state checkpointer for domain \"" + getBaseDN() + "\""); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | protected RSUpdater(CSN replServerMaxCSN) |
| | | { |
| | | super("Replica DS(" + serverId |
| | | + ") missing change publisher for domain \"" + baseDn + "\""); |
| | | super("Replica DS(" + getServerId() |
| | | + ") missing change publisher for domain \"" + getBaseDN() + "\""); |
| | | this.startCSN = replServerMaxCSN; |
| | | } |
| | | |
| | |
| | | * Log an error for the repair tool |
| | | * that will need to re-synchronize the servers. |
| | | */ |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get(baseDn.toNormalizedString()); |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()); |
| | | logError(message); |
| | | } |
| | | } catch (Exception e) |
| | |
| | | * Log an error for the repair tool |
| | | * that will need to re-synchronize the servers. |
| | | */ |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get(baseDn.toNormalizedString()); |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()); |
| | | logError(message); |
| | | } |
| | | finally |
| | |
| | | BlockingQueue<UpdateToReplay> updateToReplayQueue) |
| | | throws ConfigException |
| | | { |
| | | super(configuration.getBaseDN().toNormalizedString(), |
| | | super(configuration.getBaseDN(), |
| | | configuration.getServerId(), |
| | | configuration.getInitializationWindowSize()); |
| | | |
| | | // Read the configuration parameters. |
| | | Set<String> replicationServers = configuration.getReplicationServer(); |
| | | |
| | | this.serverId = configuration.getServerId(); |
| | | this.baseDn = configuration.getBaseDN(); |
| | | int window = configuration.getWindowSize(); |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | |
| | | readAssuredConfig(configuration, false); |
| | | |
| | | // Get fractional configuration |
| | | fractionalConfig = new FractionalConfig(baseDn); |
| | | fractionalConfig = new FractionalConfig(getBaseDN()); |
| | | readFractionalConfig(configuration, false); |
| | | |
| | | setGroupId((byte)configuration.getGroupId()); |
| | |
| | | |
| | | solveConflictFlag = isSolveConflict(configuration); |
| | | |
| | | Backend backend = retrievesBackend(baseDn); |
| | | Backend backend = retrievesBackend(getBaseDN()); |
| | | if (backend == null) |
| | | { |
| | | throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get( |
| | | baseDn.toNormalizedString())); |
| | | getBaseDNString())); |
| | | } |
| | | |
| | | try |
| | |
| | | catch (DirectoryException e) |
| | | { |
| | | logError(ERR_LOADING_GENERATION_ID.get( |
| | | baseDn.toNormalizedString(), e.getLocalizedMessage())); |
| | | getBaseDNString(), e.getLocalizedMessage())); |
| | | } |
| | | |
| | | /* |
| | | * Create a new Persistent Server State that will be used to store |
| | | * the last CSN seen from all LDAP servers in the topology. |
| | | */ |
| | | state = new PersistentServerState(baseDn, serverId, getServerState()); |
| | | state = new PersistentServerState(getBaseDN(), getServerId(), |
| | | getServerState()); |
| | | flushThread = new ServerStateFlush(); |
| | | |
| | | /* |
| | |
| | | */ |
| | | private boolean isSolveConflict(ReplicationDomainCfg cfg) |
| | | { |
| | | return !baseDn.equals(DirectoryServer.getSchemaDN()) |
| | | return !getBaseDN().equals(DirectoryServer.getSchemaDN()) |
| | | && cfg.isSolveConflicts(); |
| | | } |
| | | |
| | |
| | | // Should not happen as normally already called without problem in |
| | | // isConfigurationChangeAcceptable or isConfigurationAcceptable |
| | | // if we come up to this method |
| | | Message message = NOTE_ERR_FRACTIONAL.get(baseDn.toString(), |
| | | Message message = NOTE_ERR_FRACTIONAL.get(getBaseDNString(), |
| | | e.getLocalizedMessage()); |
| | | logError(message); |
| | | return; |
| | |
| | | catch (ConfigException e) |
| | | { |
| | | // Should not happen |
| | | Message message = NOTE_ERR_FRACTIONAL.get(baseDn.toString(), |
| | | Message message = NOTE_ERR_FRACTIONAL.get(getBaseDNString(), |
| | | e.getLocalizedMessage()); |
| | | logError(message); |
| | | return; |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Attempt to read the potential fractional config in domain root " |
| | | + "entry " + baseDn); |
| | | + "entry " + getBaseDN()); |
| | | |
| | | LDAPFilter filter; |
| | | try |
| | |
| | | } |
| | | |
| | | // Search the domain root entry that is used to save the generation id |
| | | ByteString asn1BaseDn = ByteString.valueOf(baseDn.toString()); |
| | | ByteString asn1BaseDn = ByteString.valueOf(getBaseDNString()); |
| | | Set<String> attributes = new LinkedHashSet<String>(3); |
| | | attributes.add(REPLICATION_GENERATION_ID); |
| | | attributes.add(REPLICATION_FRACTIONAL_EXCLUDE); |
| | |
| | | Message message = ERR_SEARCHING_GENERATION_ID.get( |
| | | search.getResultCode().getResultCodeName() + " " + |
| | | search.getErrorMessage(), |
| | | baseDn.toString()); |
| | | getBaseDNString()); |
| | | logError(message); |
| | | return false; |
| | | } |
| | |
| | | } |
| | | if (attr.size() > 1) |
| | | { |
| | | Message message = ERR_LOADING_GENERATION_ID.get(baseDn.toString(), |
| | | Message message = ERR_LOADING_GENERATION_ID.get(getBaseDNString(), |
| | | "#Values=" + attr.size() + " Must be exactly 1 in entry " |
| | | + resultEntry.toLDIFString()); |
| | | logError(message); |
| | |
| | | } |
| | | catch(DirectoryException e) |
| | | { |
| | | Message message = NOTE_ERR_FRACTIONAL.get(baseDn.toString(), |
| | | Message message = NOTE_ERR_FRACTIONAL.get(getBaseDNString(), |
| | | e.getLocalizedMessage()); |
| | | logError(message); |
| | | return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES; |
| | |
| | | { |
| | | case IMPORT_ERROR_MESSAGE_BAD_REMOTE: |
| | | msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE.get( |
| | | baseDn.toString(), Integer.toString(ieContext.getImportSource())); |
| | | getBaseDNString(), Integer.toString(ieContext.getImportSource())); |
| | | break; |
| | | case IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL: |
| | | msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL.get( |
| | | baseDn.toString(), Integer.toString(ieContext.getImportSource())); |
| | | getBaseDNString(), Integer.toString(ieContext.getImportSource())); |
| | | break; |
| | | } |
| | | ieContext.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg)); |
| | |
| | | if (target == RoutableMsg.ALL_SERVERS && fractionalConfig.isFractional()) |
| | | { |
| | | Message msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_FULL_UPDATE_FRACTIONAL.get( |
| | | baseDn.toString(), Integer.toString(getServerId())); |
| | | getBaseDNString(), Integer.toString(getServerId())); |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the base DN of this ReplicationDomain. |
| | | * |
| | | * @return The base DN of this ReplicationDomain |
| | | */ |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDn; |
| | | } |
| | | |
| | | /** |
| | | * Implement the handleConflictResolution phase of the deleteOperation. |
| | | * |
| | | * @param deleteOperation The deleteOperation. |
| | |
| | | { |
| | | if (!deleteOperation.isSynchronizationOperation() && !brokerIsConnected()) |
| | | { |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDNString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | |
| | | { |
| | | if (!addOperation.isSynchronizationOperation() && !brokerIsConnected()) |
| | | { |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDNString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | |
| | | StringBuilder sb = new StringBuilder(); |
| | | addOperation.toString(sb); |
| | | Message msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get( |
| | | baseDn.toString(), sb.toString()); |
| | | getBaseDNString(), sb.toString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | |
| | | { |
| | | if (!modifyDNOperation.isSynchronizationOperation() && !brokerIsConnected()) |
| | | { |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDNString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | |
| | | StringBuilder sb = new StringBuilder(); |
| | | modifyDNOperation.toString(sb); |
| | | Message msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get( |
| | | baseDn.toString(), sb.toString()); |
| | | getBaseDNString(), sb.toString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | |
| | | { |
| | | if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected()) |
| | | { |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDNString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | |
| | | StringBuilder sb = new StringBuilder(); |
| | | modifyOperation.toString(sb); |
| | | Message msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get( |
| | | baseDn.toString(), sb.toString()); |
| | | getBaseDNString(), sb.toString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | |
| | | // that is replicated, the generation is now lost because the |
| | | // DB is empty. We need to save it again the next time we add an entry. |
| | | if (op.getOperationType().equals(OperationType.DELETE) |
| | | && ((PostOperationDeleteOperation) op).getEntryDN().equals(baseDn)) |
| | | && ((PostOperationDeleteOperation) op) |
| | | .getEntryDN().equals(getBaseDN())) |
| | | { |
| | | generationIdSavedStatus = false; |
| | | } |
| | |
| | | ByteString.valueOf(freedDN.toString())); |
| | | |
| | | InternalSearchOperation searchOp = conn.processSearch( |
| | | ByteString.valueOf(baseDn.toString()), |
| | | ByteString.valueOf(getBaseDNString()), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, filter, |
| | |
| | | { |
| | | try |
| | | { |
| | | InternalSearchOperation search = conn.processSearch(baseDn, |
| | | InternalSearchOperation search = conn.processSearch(getBaseDN(), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | SearchFilter.createFilterFromString("entryuuid="+uuid)); |
| | | if (search.getResultCode() == ResultCode.SUCCESS) |
| | |
| | | addConflict(msg); |
| | | |
| | | msg.setDn(generateConflictRDN(entryUUID, |
| | | op.getEntryDN().getRDN().toString()) + "," |
| | | + baseDn); |
| | | op.getEntryDN().getRDN().toString()) + "," + getBaseDN()); |
| | | // reset the parent entryUUID so that the check done is the |
| | | // handleConflict phase does not fail. |
| | | msg.setParentEntryUUID(null); |
| | |
| | | DirectoryServer.sendAlertNotification(this, |
| | | ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); |
| | | |
| | | ModifyDNOperation newOp = |
| | | renameEntry(dn, generateDeleteConflictDn(entryUUID, dn), baseDn, true); |
| | | RDN newRDN = generateDeleteConflictDn(entryUUID, dn); |
| | | ModifyDNOperation newOp = renameEntry(dn, newRDN, getBaseDN(), true); |
| | | |
| | | if (newOp.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | |
| | | state.clearInMemory(); |
| | | state.loadState(); |
| | | |
| | | generator.adjust(state.getMaxCSN(serverId)); |
| | | generator.adjust(state.getMaxCSN(getServerId())); |
| | | // Retrieves the generation ID associated with the data imported |
| | | |
| | | generationId = loadGenerationId(); |
| | |
| | | * should we stop the modifications ? |
| | | */ |
| | | logError(ERR_LOADING_GENERATION_ID.get( |
| | | baseDn.toNormalizedString(), e.getLocalizedMessage())); |
| | | getBaseDNString(), e.getLocalizedMessage())); |
| | | return; |
| | | } |
| | | |
| | |
| | | */ |
| | | public ResultCode saveGenerationId(long generationId) |
| | | { |
| | | ResultCode result = runSaveGenerationId(baseDn, generationId); |
| | | ResultCode result = runSaveGenerationId(getBaseDN(), generationId); |
| | | |
| | | if (result != ResultCode.SUCCESS) |
| | | { |
| | |
| | | if (result != ResultCode.SUCCESS) |
| | | { |
| | | Message message = ERR_UPDATING_GENERATION_ID.get( |
| | | result.getResultCodeName() + " " , |
| | | baseDn.toString()); |
| | | result.getResultCodeName() + " " , getBaseDNString()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | private long loadGenerationId() throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Attempt to read generation ID from DB " + baseDn); |
| | | TRACER.debugInfo("Attempt to read generation ID from DB " + getBaseDN()); |
| | | |
| | | /* |
| | | * Search the database entry that is used to periodically |
| | |
| | | final Set<String> attributes = new LinkedHashSet<String>(1); |
| | | attributes.add(REPLICATION_GENERATION_ID); |
| | | final String filter = "(objectclass=*)"; |
| | | InternalSearchOperation search = conn.processSearch(baseDn.toString(), |
| | | InternalSearchOperation search = conn.processSearch(getBaseDNString(), |
| | | SearchScope.BASE_OBJECT, |
| | | DereferencePolicy.DEREF_ALWAYS, 0, 0, false, |
| | | filter,attributes); |
| | |
| | | Message message = ERR_SEARCHING_GENERATION_ID.get( |
| | | search.getResultCode().getResultCodeName() + " " + |
| | | search.getErrorMessage(), |
| | | baseDn.toString()); |
| | | getBaseDNString()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | if (attr.size()>1) |
| | | { |
| | | Message message = ERR_LOADING_GENERATION_ID.get( |
| | | baseDn.toString(), "#Values=" + attr.size() + |
| | | " Must be exactly 1 in entry " + |
| | | resultEntry.toLDIFString()); |
| | | getBaseDNString(), "#Values=" + attr.size() + |
| | | " Must be exactly 1 in entry " + resultEntry.toLDIFString()); |
| | | logError(message); |
| | | } |
| | | else if (attr.size() == 1) |
| | |
| | | catch(Exception e) |
| | | { |
| | | Message message = ERR_LOADING_GENERATION_ID.get( |
| | | baseDn.toString(), e.getLocalizedMessage()); |
| | | getBaseDNString(), e.getLocalizedMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Generation ID created for domain base DN=" |
| | | + baseDn + " generationId=" + aGenerationId); |
| | | + getBaseDN() + " generationId=" + aGenerationId); |
| | | } |
| | | else |
| | | { |
| | | generationIdSavedStatus = true; |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Generation ID successfully read from domain base DN=" |
| | | + baseDn + " generationId=" + aGenerationId); |
| | | + getBaseDN() + " generationId=" + aGenerationId); |
| | | } |
| | | return aGenerationId; |
| | | } |
| | |
| | | private long exportBackend(OutputStream output, boolean checksumOutput) |
| | | throws DirectoryException |
| | | { |
| | | Backend backend = retrievesBackend(this.baseDn); |
| | | Backend backend = retrievesBackend(getBaseDN()); |
| | | |
| | | // Acquire a shared lock for the backend. |
| | | try |
| | |
| | | throw new DirectoryException(ResultCode.OTHER, message, null); |
| | | } |
| | | |
| | | long numberOfEntries = backend.numSubordinates(baseDn, true) + 1; |
| | | long numberOfEntries = backend.numSubordinates(getBaseDN(), true) + 1; |
| | | long entryCount = Math.min(numberOfEntries, 1000); |
| | | OutputStream os; |
| | | ReplLDIFOutputStream ros = null; |
| | |
| | | |
| | | // baseDn branch is the only one included in the export |
| | | List<DN> includeBranches = new ArrayList<DN>(1); |
| | | includeBranches.add(this.baseDn); |
| | | includeBranches.add(getBaseDN()); |
| | | LDIFExportConfig exportConfig = new LDIFExportConfig(os); |
| | | exportConfig.setIncludeBranches(includeBranches); |
| | | |
| | |
| | | * Retrieves the backend related to the domain. |
| | | * |
| | | * @return The backend of that domain. |
| | | * @param baseDn The baseDn to retrieve the backend |
| | | * @param baseDN The baseDN to retrieve the backend |
| | | */ |
| | | protected static Backend retrievesBackend(DN baseDn) |
| | | protected static Backend retrievesBackend(DN baseDN) |
| | | { |
| | | // Retrieves the backend related to this domain |
| | | return DirectoryServer.getBackend(baseDn); |
| | | return DirectoryServer.getBackend(baseDN); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | LDIFImportConfig importConfig = null; |
| | | |
| | | Backend backend = retrievesBackend(baseDn); |
| | | Backend backend = retrievesBackend(getBaseDN()); |
| | | |
| | | try |
| | | { |
| | |
| | | } |
| | | else |
| | | { |
| | | importConfig = |
| | | new LDIFImportConfig(input); |
| | | importConfig = new LDIFImportConfig(input); |
| | | List<DN> includeBranches = new ArrayList<DN>(); |
| | | includeBranches.add(this.baseDn); |
| | | includeBranches.add(getBaseDN()); |
| | | importConfig.setIncludeBranches(includeBranches); |
| | | importConfig.setAppendToExistingData(false); |
| | | importConfig.setSkipDNValidation(true); |
| | |
| | | { |
| | | importConfig.close(); |
| | | closeBackendImport(backend); // Re-enable backend |
| | | backend = retrievesBackend(baseDn); |
| | | backend = retrievesBackend(getBaseDN()); |
| | | } |
| | | |
| | | loadDataState(); |
| | |
| | | */ |
| | | public Backend getBackend() |
| | | { |
| | | return retrievesBackend(baseDn); |
| | | return retrievesBackend(getBaseDN()); |
| | | } |
| | | |
| | | /* |
| | |
| | | ReplicationDomainCfg configuration, List<Message> unacceptableReasons) |
| | | { |
| | | // Check that there is not already a domain with the same DN |
| | | DN dn = configuration.getBaseDN(); |
| | | final DN dn = configuration.getBaseDN(); |
| | | LDAPReplicationDomain domain = MultimasterReplication.findDomain(dn, null); |
| | | if (domain != null && domain.baseDn.equals(dn)) |
| | | if (domain != null && domain.getBaseDN().equals(dn)) |
| | | { |
| | | Message message = ERR_SYNC_INVALID_DN.get(); |
| | | unacceptableReasons.add(message); |
| | |
| | | { |
| | | throw new ConfigException( |
| | | NOTE_ERR_UNABLE_TO_ENABLE_ECL.get( |
| | | "Replication Domain on" + baseDn, |
| | | "Replication Domain on" + getBaseDN(), |
| | | de.getMessage() + " " + de.getCause().getMessage()), de); |
| | | } |
| | | } |
| | |
| | | { |
| | | Message message = |
| | | NOTE_ERR_UNABLE_TO_ENABLE_ECL.get( |
| | | "Replication Domain on" + baseDn, |
| | | "Replication Domain on" + getBaseDN(), |
| | | de.getMessage() + " " + de.getCause().getMessage()); |
| | | logError(message); |
| | | // and go on |
| | |
| | | setNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT); |
| | | broker.signalStatusChange(status); |
| | | Message message = NOTE_FRACTIONAL_BAD_DATA_SET_NEED_RESYNC.get( |
| | | baseDn.toString()); |
| | | getBaseDNString()); |
| | | logError(message); |
| | | return; // Do not send changes to the replication server |
| | | } |
| | |
| | | * Check that the ReplicationServer has seen all our previous |
| | | * changes. |
| | | */ |
| | | CSN replServerMaxCSN = replicationServerState.getCSN(serverId); |
| | | CSN replServerMaxCSN = replicationServerState.getCSN(getServerId()); |
| | | |
| | | // we don't want to update from here (a DS) an empty RS because |
| | | // normally the RS should have been updated by other RSes except for |
| | |
| | | // and we don't want to update it with our changes that could be huge. |
| | | if (replServerMaxCSN != null && replServerMaxCSN.getSeqnum() != 0) |
| | | { |
| | | CSN ourMaxCSN = state.getMaxCSN(serverId); |
| | | CSN ourMaxCSN = state.getMaxCSN(getServerId()); |
| | | if (ourMaxCSN != null && !ourMaxCSN.olderOrEqual(replServerMaxCSN)) |
| | | { |
| | | pendingChanges.setRecovering(true); |
| | |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_PUBLISHING_FAKE_OPS.get(baseDn.toNormalizedString(), |
| | | Message message = ERR_PUBLISHING_FAKE_OPS.get(getBaseDNString(), |
| | | e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } |
| | |
| | | // So we search by interval of 10 seconds and store the results in the |
| | | // replayOperations list so that they are sorted before sending them. |
| | | long missingChangesDelta = currentStartCSN.getTime() + 10000; |
| | | CSN endCSN = new CSN(missingChangesDelta, 0xffffffff, serverId); |
| | | CSN endCSN = new CSN(missingChangesDelta, 0xffffffff, getServerId()); |
| | | |
| | | ScanSearchListener listener = |
| | | new ScanSearchListener(currentStartCSN, endCSN); |
| | | op = searchForChangedEntries(baseDn, currentStartCSN, endCSN, listener); |
| | | op = searchForChangedEntries(getBaseDN(), currentStartCSN, endCSN, |
| | | listener); |
| | | |
| | | // Publish and remove all the changes from the replayOperations list |
| | | // that are older than the endCSN. |
| | |
| | | @Override |
| | | public long countEntries() throws DirectoryException |
| | | { |
| | | Backend backend = retrievesBackend(baseDn); |
| | | Backend backend = retrievesBackend(getBaseDN()); |
| | | if (!backend.supportsLDIFExport()) |
| | | { |
| | | Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get( |
| | |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | return backend.numSubordinates(baseDn, true) + 1; |
| | | return backend.numSubordinates(getBaseDN(), true) + 1; |
| | | } |
| | | |
| | | /** |
| | |
| | | try |
| | | { |
| | | source = Integer.decode(sourceString); |
| | | if (source >= -1 && source != serverId) |
| | | if (source >= -1 && source != getServerId()) |
| | | { |
| | | // TODO Verifies serverID is in the domain |
| | | // We should check here that this is a server implied |
| | |
| | | if (cause != null) |
| | | { |
| | | Message message = ERR_INVALID_IMPORT_SOURCE.get( |
| | | baseDn.toNormalizedString(), Integer.toString(serverId), |
| | | getBaseDNString(), Integer.toString(getServerId()), |
| | | Integer.toString(source),"Details:" + cause.getLocalizedMessage()); |
| | | throw new DirectoryException(resultCode, message, cause); |
| | | } |
| | | Message message = ERR_INVALID_IMPORT_SOURCE.get( |
| | | baseDn.toNormalizedString(), Integer.toString(serverId), |
| | | Integer.toString(source),""); |
| | | Message message = ERR_INVALID_IMPORT_SOURCE.get(getBaseDNString(), |
| | | Integer.toString(getServerId()), Integer.toString(source), ""); |
| | | throw new DirectoryException(resultCode, message); |
| | | } |
| | | |
| | |
| | | long endDate) throws DirectoryException |
| | | { |
| | | TRACER.debugInfo("[PURGE] purgeConflictsHistorical " |
| | | + "on domain: " + baseDn |
| | | + "on domain: " + getBaseDN() |
| | | + "endDate:" + new Date(endDate) |
| | | + "lastCSNPurgedFromHist: " |
| | | + lastCSNPurgedFromHist.toStringUI()); |
| | |
| | | } |
| | | |
| | | InternalSearchOperation searchOp = conn.processSearch( |
| | | ByteString.valueOf(baseDn.toString()), |
| | | ByteString.valueOf(getBaseDNString()), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, filter, |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * This is the changelog state stored in the changelogStateDB. For each |
| | | * replication domain, it contains: |
| | |
| | | public class ChangelogState |
| | | { |
| | | |
| | | private final Map<String, Long> domainToGenerationId = |
| | | new HashMap<String, Long>(); |
| | | private final Map<String, List<Integer>> domainToServerIds = |
| | | new HashMap<String, List<Integer>>(); |
| | | private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>(); |
| | | private final Map<DN, List<Integer>> domainToServerIds = |
| | | new HashMap<DN, List<Integer>>(); |
| | | |
| | | /** |
| | | * Sets the generationId for the supplied replication domain. |
| | | * |
| | | * @param baseDn |
| | | * @param baseDN |
| | | * the targeted replication domain baseDN |
| | | * @param generationId |
| | | * the generation Id to set |
| | | */ |
| | | public void setDomainGenerationId(String baseDn, long generationId) |
| | | public void setDomainGenerationId(DN baseDN, long generationId) |
| | | { |
| | | domainToGenerationId.put(baseDn, generationId); |
| | | domainToGenerationId.put(baseDN, generationId); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * @param serverId |
| | | * the serverId to add |
| | | * @param baseDn |
| | | * @param baseDN |
| | | * the targeted replication domain baseDN |
| | | */ |
| | | public void addServerIdToDomain(int serverId, String baseDn) |
| | | public void addServerIdToDomain(int serverId, DN baseDN) |
| | | { |
| | | List<Integer> serverIds = domainToServerIds.get(baseDn); |
| | | List<Integer> serverIds = domainToServerIds.get(baseDN); |
| | | if (serverIds == null) |
| | | { |
| | | serverIds = new LinkedList<Integer>(); |
| | | domainToServerIds.put(baseDn, serverIds); |
| | | domainToServerIds.put(baseDN, serverIds); |
| | | } |
| | | serverIds.add(serverId); |
| | | } |
| | |
| | | * |
| | | * @return a Map of domainBaseDN => generationId |
| | | */ |
| | | public Map<String, Long> getDomainToGenerationId() |
| | | public Map<DN, Long> getDomainToGenerationId() |
| | | { |
| | | return domainToGenerationId; |
| | | } |
| | |
| | | * |
| | | * @return a Map of domainBaseDN => List<serverId>. |
| | | */ |
| | | public Map<String, List<Integer>> getDomainToServerIds() |
| | | public Map<DN, List<Integer>> getDomainToServerIds() |
| | | { |
| | | return domainToServerIds; |
| | | } |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " + replicationServer.getServerId() |
| | | + ", DS " + getServerId() + " for baseDn " + getBaseDN() |
| | | + ", DS " + getServerId() + " for baseDN=" + getBaseDN() |
| | | + " has already generation id " + newGenId |
| | | + " so no ChangeStatusMsg sent to him."); |
| | | } |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RS " + replicationServer.getServerId() |
| | | + ", closing connection to DS " + getServerId() + " for baseDn " |
| | | + ", closing connection to DS " + getServerId() + " for baseDN=" |
| | | + getBaseDN() + " to force reconnection as new local" |
| | | + " generationId and remote one match and DS is in bad gen id: " |
| | | + newGenId); |
| | |
| | | { |
| | | TRACER.debugInfo("In RS " + replicationServer.getServerId() |
| | | + " Sending change status " + origin + " to " + getServerId() |
| | | + " for baseDn " + getBaseDN() + ":\n" + csMsg); |
| | | + " for baseDN=" + getBaseDN() + ":\n" + csMsg); |
| | | } |
| | | |
| | | session.publish(csMsg); |
| | |
| | | heartbeatInterval = serverStartMsg.getHeartbeatInterval(); |
| | | |
| | | // generic stuff |
| | | setBaseDNAndDomain(serverStartMsg.getBaseDn(), true); |
| | | DN baseDN = DN.decode(serverStartMsg.getBaseDn()); |
| | | setBaseDNAndDomain(baseDN, true); |
| | | setInitialServerState(serverStartMsg.getServerState()); |
| | | setSendWindowSize(serverStartMsg.getWindowSize()); |
| | | |
| | |
| | | |
| | | Message message = INFO_REPLICATION_SERVER_CONNECTION_FROM_DS |
| | | .get(getReplicationServerId(), getServerId(), |
| | | replicationServerDomain.getBaseDn(), |
| | | replicationServerDomain.getBaseDN().toNormalizedString(), |
| | | session.getReadableRemoteAddress()); |
| | | logError(message); |
| | | |
| | |
| | | if (serverId != 0) |
| | | { |
| | | return "Replica DS(" + serverId + ") for domain \"" |
| | | + replicationServerDomain.getBaseDn() + "\""; |
| | | + replicationServerDomain.getBaseDN() + "\""; |
| | | } |
| | | return "Unknown server"; |
| | | } |
| | |
| | | super(session, queueSize, replicationServer, rcvWindowSize); |
| | | try |
| | | { |
| | | setBaseDNAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true); |
| | | DN baseDN = DN.decode(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | setBaseDNAndDomain(baseDN, true); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | |
| | | Depending on allowUnknownDomains provided flag, a non empty map will |
| | | be considered as an error when allowUnknownDomains is false. |
| | | */ |
| | | Map<String,ServerState> startStatesFromProvidedCookie = |
| | | new HashMap<String,ServerState>(); |
| | | Map<DN, ServerState> startStatesFromProvidedCookie = |
| | | new HashMap<DN, ServerState>(); |
| | | |
| | | ReplicationServer rs = this.replicationServer; |
| | | |
| | |
| | | continue; |
| | | |
| | | // skip the excluded domains |
| | | if (excludedBaseDNs.contains(rsd.getBaseDn())) |
| | | if (excludedBaseDNs.contains(rsd.getBaseDN().toNormalizedString())) |
| | | { |
| | | // this is an excluded domain |
| | | if (allowUnknownDomains) |
| | | startStatesFromProvidedCookie.remove(rsd.getBaseDn()); |
| | | startStatesFromProvidedCookie.remove(rsd.getBaseDN()); |
| | | continue; |
| | | } |
| | | |
| | |
| | | if (isPersistent == PERSISTENT_CHANGES_ONLY) |
| | | { |
| | | newDomainCtxt.startState = rsd.getEligibleState(eligibleCSN); |
| | | startStatesFromProvidedCookie.remove(rsd.getBaseDn()); |
| | | startStatesFromProvidedCookie.remove(rsd.getBaseDN()); |
| | | } |
| | | else |
| | | { |
| | | // let's take the start state for this domain from the provided |
| | | // cookie |
| | | newDomainCtxt.startState = |
| | | startStatesFromProvidedCookie.remove(rsd.getBaseDn()); |
| | | startStatesFromProvidedCookie.remove(rsd.getBaseDN()); |
| | | |
| | | if (providedCookie == null |
| | | || providedCookie.length() == 0 |
| | |
| | | // when there is a cookie provided in the request, |
| | | if (newDomainCtxt.startState == null) |
| | | { |
| | | missingDomains += (rsd.getBaseDn() + ":;"); |
| | | missingDomains += (rsd.getBaseDN() + ":;"); |
| | | continue; |
| | | } |
| | | else if (!newDomainCtxt.startState.isEmpty()) |
| | |
| | | if (hasCookieBeenTrimmedFromDB(rsd, newDomainCtxt.startState)) |
| | | { |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | | ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE |
| | | .get(newDomainCtxt.rsd.getBaseDn())); |
| | | ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get( |
| | | newDomainCtxt.rsd.getBaseDN().toNormalizedString())); |
| | | } |
| | | } |
| | | } |
| | |
| | | // Creates an unconnected SH for the domain |
| | | MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer); |
| | | mh.setInitialServerState(newDomainCtxt.startState); |
| | | mh.setBaseDNAndDomain(rsd.getBaseDn(), false); |
| | | mh.setBaseDNAndDomain(rsd.getBaseDN(), false); |
| | | // register the unconnected into the domain |
| | | rsd.registerHandler(mh); |
| | | newDomainCtxt.mh = mh; |
| | | |
| | | previousCookie.update(newDomainCtxt.rsd.getBaseDn(), |
| | | previousCookie.update(newDomainCtxt.rsd.getBaseDN(), |
| | | newDomainCtxt.startState); |
| | | |
| | | // store the new context |
| | |
| | | if (!startStatesFromProvidedCookie.isEmpty()) |
| | | { |
| | | if (allowUnknownDomains) |
| | | for (String providedDomain : startStatesFromProvidedCookie.keySet()) |
| | | for (DN providedDomain : startStatesFromProvidedCookie.keySet()) |
| | | if (rs.getReplicationServerDomain(providedDomain) == null) |
| | | // the domain provided in the cookie is not replicated |
| | | startStatesFromProvidedCookie.remove(providedDomain); |
| | |
| | | */ |
| | | StringBuilder sb = new StringBuilder(); |
| | | for (DomainContext domainCtxt : domainCtxts) { |
| | | sb.append(domainCtxt.rsd.getBaseDn()).append(":") |
| | | sb.append(domainCtxt.rsd.getBaseDN()).append(":") |
| | | .append(domainCtxt.startState).append(";"); |
| | | } |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, |
| | |
| | | final ECLUpdateMsg change = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg) oldestContext.nextMsg, |
| | | null, // cookie will be set later |
| | | oldestContext.rsd.getBaseDn(), |
| | | oldestContext.rsd.getBaseDN().toNormalizedString(), |
| | | 0); // changeNumber may be set later |
| | | oldestContext.nextMsg = null; |
| | | |
| | |
| | | final ECLUpdateMsg change = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg) oldestContext.nextMsg, |
| | | null, // set later |
| | | oldestContext.rsd.getBaseDn(), |
| | | oldestContext.rsd.getBaseDN().toNormalizedString(), |
| | | 0); |
| | | oldestContext.nextMsg = null; // clean |
| | | |
| | |
| | | TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn); |
| | | |
| | | // Update the current state |
| | | previousCookie.update(oldestChange.getBaseDN(), csn); |
| | | previousCookie.update(DN.decode(oldestChange.getBaseDN()), csn); |
| | | |
| | | // Set the current value of global state in the returned message |
| | | oldestChange.setCookie(previousCookie); |
| | |
| | | |
| | | // replogCSN : the oldest change from the changelog db |
| | | CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN(); |
| | | String dnFromChangelogDb = oldestChange.getBaseDN(); |
| | | DN dnFromChangelogDb = DN.decode(oldestChange.getBaseDN()); |
| | | |
| | | while (true) |
| | | { |
| | |
| | | // the next change from the CNIndexDB |
| | | final CNIndexRecord currentRecord = cnIndexDBCursor.getRecord(); |
| | | final CSN csnFromDraftCNDb = currentRecord.getCSN(); |
| | | final String dnFromDraftCNDb = currentRecord.getBaseDN(); |
| | | final DN dnFromDraftCNDb = currentRecord.getBaseDN(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("assignChangeNumber() generating change number " |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean areSameChange(CSN csn1, String dn1, CSN csn2, String dn2) |
| | | private boolean areSameChange(CSN csn1, DN dn1, CSN csn2, DN dn2) |
| | | { |
| | | boolean sameDN = dn1.compareTo(dn2) == 0; |
| | | boolean sameCSN = csn1.compareTo(csn2) == 0; |
| | |
| | | replicationServer.getChangeNumberIndexDB().addRecord(new CNIndexRecord( |
| | | change.getChangeNumber(), |
| | | previousCookie.toString(), |
| | | change.getBaseDN(), |
| | | DN.decode(change.getBaseDN()), |
| | | change.getUpdateMsg().getCSN())); |
| | | } |
| | | |
| | |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | |
| | | attributes.add(Attributes.create("server-id", String.valueOf(serverId))); |
| | | attributes.add(Attributes.create("domain-name", rsDomain.getBaseDn())); |
| | | attributes.add(Attributes.create("domain-name", |
| | | rsDomain.getBaseDN().toNormalizedString())); |
| | | attributes.add(Attributes.create("connected-to", |
| | | replServerHandler.getMonitorInstanceName())); |
| | | |
| | |
| | | /** |
| | | * Local hosting RS. |
| | | */ |
| | | protected ReplicationServer replicationServer = null; |
| | | protected ReplicationServer replicationServer; |
| | | /** |
| | | * Specifies the related replication server domain based on baseDn. |
| | | * Specifies the related replication server domain based on baseDN. |
| | | */ |
| | | protected ReplicationServerDomain replicationServerDomain = null; |
| | | protected ReplicationServerDomain replicationServerDomain; |
| | | /** |
| | | * Number of update sent to the server. |
| | | */ |
| | |
| | | */ |
| | | private ServerState serverState; |
| | | /** |
| | | * Specifies the baseDn of the domain. |
| | | * Specifies the baseDN of the domain. |
| | | */ |
| | | private String baseDN = null; |
| | | private DN baseDN; |
| | | /** |
| | | * Specifies whether the consumer is still active or not. |
| | | * If not active, the handler will not return any message. |
| | |
| | | */ |
| | | protected String getBaseDN() |
| | | { |
| | | return baseDN; |
| | | return baseDN.toNormalizedString(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @exception DirectoryException |
| | | * raised when a problem occurs. |
| | | */ |
| | | protected void setBaseDNAndDomain(String baseDN, boolean isDataServer) |
| | | protected void setBaseDNAndDomain(DN baseDN, boolean isDataServer) |
| | | throws DirectoryException |
| | | { |
| | | if (this.baseDN != null) |
| | | { |
| | | if (!this.baseDN.equalsIgnoreCase(baseDN)) |
| | | if (!this.baseDN.equals(baseDN)) |
| | | { |
| | | Message message = ERR_RS_DN_DOES_NOT_MATCH.get(this.baseDN, baseDN); |
| | | Message message = ERR_RS_DN_DOES_NOT_MATCH.get( |
| | | this.baseDN.toNormalizedString(), baseDN.toNormalizedString()); |
| | | throw new DirectoryException(ResultCode.OTHER, message, null); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | this.baseDN = baseDN; |
| | | if (!baseDN.equalsIgnoreCase("cn=changelog")) |
| | | if (!baseDN.toNormalizedString().equals("cn=changelog")) |
| | | this.replicationServerDomain = getDomain(isDataServer); |
| | | } |
| | | } |
| | |
| | | super("Replication server RS(" |
| | | + replicationServerDomain.getLocalRSServerId() |
| | | + ") monitor publisher for domain \"" |
| | | + replicationServerDomain.getBaseDn() + "\""); |
| | | + replicationServerDomain.getBaseDN() + "\""); |
| | | |
| | | this.domain = replicationServerDomain; |
| | | this.period = period; |
| | |
| | | private String getMessage(String message) |
| | | { |
| | | return "In RS " + domain.getLocalRSServerId() + ", for base dn " |
| | | + domain.getBaseDn() + ": " + message; |
| | | + domain.getBaseDN() + ": " + message; |
| | | } |
| | | } |
| | |
| | | } |
| | | else |
| | | { |
| | | DN baseDN = DN.decode(rsd.getBaseDn() + "," + BASE_DN); |
| | | DN baseDN = DN.decode(rsd.getBaseDN() + "," + BASE_DN); |
| | | for (DN includeBranch : includeBranches) |
| | | { |
| | | if (includeBranch.isDescendantOf(baseDN) |
| | |
| | | TRACER.debugInfo("State=" + serverState); |
| | | Attribute stateAttr = Attributes.create("state", serverState.toString()); |
| | | Attribute genidAttr = Attributes.create("generation-id", |
| | | exportContainer.getGenerationId() + exportContainer.getBaseDn()); |
| | | "" + exportContainer.getGenerationId() + exportContainer.getBaseDN()); |
| | | |
| | | attrs.clear(); |
| | | attrs.put(ocType, singletonList(ocAttr)); |
| | | attrs.put(stateAttr.getAttributeType(), singletonList(stateAttr)); |
| | | attrs.put(genidAttr.getAttributeType(), singletonList(genidAttr)); |
| | | |
| | | final String dnString = exportContainer.getBaseDn() + "," + BASE_DN; |
| | | final String dnString = exportContainer.getBaseDN() + "," + BASE_DN; |
| | | try |
| | | { |
| | | DN dn = DN.decode(dnString); |
| | |
| | | } |
| | | lookthroughCount++; |
| | | writeChange(cursor.getChange(), ldifWriter, searchOperation, |
| | | rsd.getBaseDn(), exportConfig != null); |
| | | rsd.getBaseDN(), exportConfig != null); |
| | | cursor.next(); |
| | | } |
| | | } |
| | |
| | | * Exports one change. |
| | | */ |
| | | private void writeChange(UpdateMsg updateMsg, LDIFWriter ldifWriter, |
| | | SearchOperation searchOperation, String baseDN, boolean isExport) |
| | | SearchOperation searchOperation, DN baseDN, boolean isExport) |
| | | { |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | |
| | | |
| | | addAttribute(entry.getUserAttributes(), CHANGE_NUMBER, |
| | | msg.getCSN().toString()); |
| | | addAttribute(entry.getUserAttributes(), "replicationDomain", baseDN); |
| | | addAttribute(entry.getUserAttributes(), "replicationDomain", |
| | | baseDN.toNormalizedString()); |
| | | |
| | | // Get the base DN, scope, and filter for the search. |
| | | DN searchBaseDN = searchOperation.getBaseDN(); |
| | |
| | | ReplicationServerDomain rsd = iter.next(); |
| | | |
| | | // Skip containers that are not covered by the include branches. |
| | | DN baseDN = DN.decode(rsd.getBaseDn() + "," + BASE_DN); |
| | | DN baseDN = DN.decode(rsd.getBaseDN() + "," + BASE_DN); |
| | | if (searchBaseDN.isDescendantOf(baseDN) |
| | | || searchBaseDN.isAncestorOf(baseDN)) |
| | | { |
| | |
| | | { |
| | | try |
| | | { |
| | | String baseDN = domain.getBaseDN().toNormalizedString(); |
| | | |
| | | // Prevent out of band monitor responses from updating our pending |
| | | // table until we are ready. |
| | | synchronized (pendingMonitorDataLock) |
| | |
| | | { |
| | | // Log a message and do a best effort from here. |
| | | Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get( |
| | | domain.getBaseDn(), serverId, e.getMessage()); |
| | | baseDN, serverId, e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | // error log with repeated messages. |
| | | if (!pendingMonitorDataServerIDs.contains(serverId)) |
| | | { |
| | | logError(NOTE_MONITOR_DATA_RECEIVED.get( |
| | | domain.getBaseDn(), serverId)); |
| | | logError(NOTE_MONITOR_DATA_RECEIVED.get(baseDN, serverId)); |
| | | } |
| | | } |
| | | |
| | |
| | | if (!monitorDataLateServers.contains(serverId)) |
| | | { |
| | | logError(WARN_MISSING_REMOTE_MONITOR_DATA.get( |
| | | domain.getBaseDn(), serverId)); |
| | | baseDN, serverId)); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | // This is a response for an earlier request whose computing is |
| | | // already complete. |
| | | logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(domain.getBaseDn(), |
| | | msg.getSenderID())); |
| | | logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get( |
| | | domain.getBaseDN().toNormalizedString(), msg.getSenderID())); |
| | | return; |
| | | } |
| | | |
| | |
| | | * This table is used to store the list of dn for which we are currently |
| | | * handling servers. |
| | | */ |
| | | private final Map<String, ReplicationServerDomain> baseDNs = |
| | | new HashMap<String, ReplicationServerDomain>(); |
| | | private final Map<DN, ReplicationServerDomain> baseDNs = |
| | | new HashMap<DN, ReplicationServerDomain>(); |
| | | |
| | | private volatile boolean shutdown = false; |
| | | private int rcvWindow; |
| | |
| | | continue; // Skip: avoid connecting to self. |
| | | } |
| | | |
| | | connect(rsURL, domain.getBaseDn()); |
| | | connect(rsURL, domain.getBaseDN()); |
| | | } |
| | | } |
| | | |
| | |
| | | /** |
| | | * Establish a connection to the server with the address and port. |
| | | * |
| | | * @param remoteServerURL The address and port for the server, separated by a |
| | | * colon. |
| | | * @param baseDn The baseDn of the connection |
| | | * @param remoteServerURL |
| | | * The address and port for the server, separated by a colon. |
| | | * @param baseDN |
| | | * The baseDN of the connection |
| | | */ |
| | | private void connect(String remoteServerURL, String baseDn) |
| | | private void connect(String remoteServerURL, DN baseDN) |
| | | { |
| | | int separator = remoteServerURL.lastIndexOf(':'); |
| | | String port = remoteServerURL.substring(separator + 1); |
| | |
| | | |
| | | ReplicationServerHandler rsHandler = new ReplicationServerHandler( |
| | | session, queueSize, this, rcvWindow); |
| | | rsHandler.connect(baseDn, sslEncryption); |
| | | rsHandler.connect(baseDN, sslEncryption); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | * Get the ReplicationServerDomain associated to the base DN given in |
| | | * parameter. |
| | | * |
| | | * @param baseDn The base Dn for which the ReplicationServerDomain must be |
| | | * @param baseDN |
| | | * The base Dn for which the ReplicationServerDomain must be |
| | | * returned. |
| | | * @return The ReplicationServerDomain associated to the base DN given in |
| | | * parameter. |
| | | */ |
| | | public ReplicationServerDomain getReplicationServerDomain(String baseDn) |
| | | public ReplicationServerDomain getReplicationServerDomain(DN baseDN) |
| | | { |
| | | return getReplicationServerDomain(baseDn, false); |
| | | return getReplicationServerDomain(baseDN, false); |
| | | } |
| | | |
| | | /** |
| | | * Get the ReplicationServerDomain associated to the base DN given in |
| | | * parameter. |
| | | * |
| | | * @param baseDn The base Dn for which the ReplicationServerDomain must be |
| | | * @param baseDN The base Dn for which the ReplicationServerDomain must be |
| | | * returned. |
| | | * @param create Specifies whether to create the ReplicationServerDomain if |
| | | * it does not already exist. |
| | | * @return The ReplicationServerDomain associated to the base DN given in |
| | | * parameter. |
| | | */ |
| | | public ReplicationServerDomain getReplicationServerDomain(String baseDn, |
| | | public ReplicationServerDomain getReplicationServerDomain(DN baseDN, |
| | | boolean create) |
| | | { |
| | | synchronized (baseDNs) |
| | | { |
| | | ReplicationServerDomain domain = baseDNs.get(baseDn); |
| | | ReplicationServerDomain domain = baseDNs.get(baseDN); |
| | | if (domain == null && create) { |
| | | domain = new ReplicationServerDomain(baseDn, this); |
| | | baseDNs.put(baseDn, domain); |
| | | domain = new ReplicationServerDomain(baseDN, this); |
| | | baseDNs.put(baseDN, domain); |
| | | } |
| | | return domain; |
| | | } |
| | |
| | | |
| | | /** |
| | | * Clears the generationId for the replicationServerDomain related to the |
| | | * provided baseDn. |
| | | * provided baseDN. |
| | | * |
| | | * @param baseDn |
| | | * The baseDn for which to delete the generationId. |
| | | * @param baseDN |
| | | * The baseDN for which to delete the generationId. |
| | | */ |
| | | public void clearGenerationId(String baseDn) |
| | | public void clearGenerationId(DN baseDN) |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB.clear(baseDn); |
| | | cnIndexDB.clear(baseDN); |
| | | } |
| | | catch (Exception ignored) |
| | | { |
| | |
| | | * @param baseDN The baseDN of the replicationServerDomain. |
| | | * @return The value of the generationID. |
| | | */ |
| | | public long getGenerationId(String baseDN) |
| | | public long getGenerationId(DN baseDN) |
| | | { |
| | | ReplicationServerDomain rsd = getReplicationServerDomain(baseDN); |
| | | if (rsd!=null) |
| | |
| | | CSN eligibleCSN = null; |
| | | for (ReplicationServerDomain domain : getReplicationServerDomains()) |
| | | { |
| | | if (contains(excludedBaseDNs, domain.getBaseDn())) |
| | | if (contains(excludedBaseDNs, domain.getBaseDN().toNormalizedString())) |
| | | continue; |
| | | |
| | | final CSN domainEligibleCSN = domain.getEligibleCSN(); |
| | |
| | | { |
| | | final String dates = domainEligibleCSN == null ? |
| | | "" : new Date(domainEligibleCSN.getTime()).toString(); |
| | | debugLog += "[baseDN=" + domain.getBaseDn() |
| | | debugLog += "[baseDN=" + domain.getBaseDN() |
| | | + "] [eligibleCSN=" + domainEligibleCSN + ", " + dates + "]"; |
| | | } |
| | | } |
| | |
| | | final CNIndexRecord firstCNRecord = cnIndexDB.getFirstRecord(); |
| | | final CNIndexRecord lastCNRecord = cnIndexDB.getLastRecord(); |
| | | |
| | | Map<String, ServerState> domainsServerStateForLastCN = null; |
| | | boolean noCookieForLastCN = true; |
| | | CSN csnForLastCN = null; |
| | | String domainForLastCN = null; |
| | | DN domainForLastCN = null; |
| | | if (firstCNRecord != null) |
| | | { |
| | | if (lastCNRecord == null) |
| | |
| | | // Get the generalized state associated with the current last change |
| | | // number and initializes from it the startStates table |
| | | String lastCNGenState = lastCNRecord.getPreviousCookie(); |
| | | if (lastCNGenState != null && lastCNGenState.length() > 0) |
| | | { |
| | | domainsServerStateForLastCN = MultiDomainServerState |
| | | .splitGenStateToServerStates(lastCNGenState); |
| | | } |
| | | noCookieForLastCN = lastCNGenState == null |
| | | || lastCNGenState.length() == 0; |
| | | |
| | | csnForLastCN = lastCNRecord.getCSN(); |
| | | domainForLastCN = lastCNRecord.getBaseDN(); |
| | |
| | | long newestDate = 0; |
| | | for (ReplicationServerDomain rsd : getReplicationServerDomains()) |
| | | { |
| | | if (contains(excludedBaseDNs, rsd.getBaseDn())) |
| | | if (contains(excludedBaseDNs, rsd.getBaseDN().toNormalizedString())) |
| | | continue; |
| | | |
| | | // for this domain, have the state in the replchangelog |
| | | // where the last change number update is |
| | | long ec; |
| | | if (domainsServerStateForLastCN == null) |
| | | if (noCookieForLastCN) |
| | | { |
| | | // Count changes of this domain from the beginning of the changelog |
| | | CSN trimCSN = new CSN(rsd.getLatestDomainTrimDate(), 0, 0); |
| | |
| | | CSN csnx = new CSN(newestDate, csnForLastCN.getSeqnum(), 0); |
| | | ec = rsd.getEligibleCount(csnx, crossDomainEligibleCSN); |
| | | |
| | | if (domainForLastCN.equalsIgnoreCase(rsd.getBaseDn())) |
| | | if (domainForLastCN.equals(rsd.getBaseDN())) |
| | | ec--; |
| | | } |
| | | |
| | |
| | | MultiDomainServerState result = new MultiDomainServerState(); |
| | | for (ReplicationServerDomain rsd : getReplicationServerDomains()) |
| | | { |
| | | if (contains(excludedBaseDNs, rsd.getBaseDn()) |
| | | if (contains(excludedBaseDNs, rsd.getBaseDN().toNormalizedString()) |
| | | || rsd.getDbServerState().isEmpty()) |
| | | continue; |
| | | |
| | | result.update(rsd.getBaseDn(), rsd.getEligibleState(getEligibleCSN())); |
| | | result.update(rsd.getBaseDN(), rsd.getEligibleState(getEligibleCSN())); |
| | | } |
| | | return result; |
| | | } |
| | |
| | | */ |
| | | public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | private final String baseDn; |
| | | private final DN baseDN; |
| | | |
| | | /** |
| | | * The Status analyzer that periodically verifies whether the connected DSs |
| | |
| | | private ServerState ctHeartbeatState; |
| | | |
| | | /** |
| | | * Creates a new ReplicationServerDomain associated to the DN baseDn. |
| | | * Creates a new ReplicationServerDomain associated to the baseDN. |
| | | * |
| | | * @param baseDn |
| | | * The baseDn associated to the ReplicationServerDomain. |
| | | * @param baseDN |
| | | * The baseDN associated to the ReplicationServerDomain. |
| | | * @param localReplicationServer |
| | | * the ReplicationServer that created this instance. |
| | | */ |
| | | public ReplicationServerDomain(String baseDn, |
| | | public ReplicationServerDomain(DN baseDN, |
| | | ReplicationServer localReplicationServer) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.baseDN = baseDN; |
| | | this.localReplicationServer = localReplicationServer; |
| | | this.assuredTimeoutTimer = new Timer("Replication server RS(" |
| | | + localReplicationServer.getServerId() |
| | | + ") assured timer for domain \"" + baseDn + "\"", true); |
| | | + ") assured timer for domain \"" + baseDN + "\"", true); |
| | | this.changelogDB = localReplicationServer.getChangelogDB(); |
| | | |
| | | DirectoryServer.registerMonitorProvider(this); |
| | |
| | | // Unknown assured mode: should never happen |
| | | Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | assuredMode.toString(), baseDn, update.toString()); |
| | | assuredMode.toString(), baseDN.toNormalizedString(), |
| | | update.toString()); |
| | | logError(errorMsg); |
| | | assuredMessage = false; |
| | | } |
| | |
| | | { |
| | | try |
| | | { |
| | | if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg)) |
| | | if (this.changelogDB.publishUpdateMsg(baseDN, serverId, updateMsg)) |
| | | { |
| | | /* |
| | | * JNR: Matt and I had a hard time figuring out where to put this |
| | |
| | | // Should never happen |
| | | Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | Byte.toString(safeDataLevel), baseDn, update.toString()); |
| | | Byte.toString(safeDataLevel), baseDN.toNormalizedString(), |
| | | update.toString()); |
| | | logError(errorMsg); |
| | | } else if (sourceGroupId == groupId |
| | | // Assured feature does not cross different group IDS |
| | |
| | | mb.append(ERR_RS_ERROR_SENDING_ACK.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | Integer.toString(origServer.getServerId()), |
| | | csn.toString(), baseDn)); |
| | | csn.toString(), baseDN.toNormalizedString())); |
| | | mb.append(" "); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | |
| | | mb.append(ERR_RS_ERROR_SENDING_ACK.get( |
| | | Integer.toString(localReplicationServer.getServerId()), |
| | | Integer.toString(origServer.getServerId()), |
| | | csn.toString(), baseDn)); |
| | | csn.toString(), baseDN.toNormalizedString())); |
| | | mb.append(" "); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | |
| | | */ |
| | | public Set<Integer> getServerIds() |
| | | { |
| | | return changelogDB.getDomainServerIds(baseDn); |
| | | return changelogDB.getDomainServerIds(baseDN); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN) |
| | | { |
| | | return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN); |
| | | return changelogDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getCount(int serverId, CSN from, CSN to) |
| | | { |
| | | return changelogDB.getCount(baseDn, serverId, from, to); |
| | | return changelogDB.getCount(baseDN, serverId, from, to); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getChangesCount() |
| | | { |
| | | return changelogDB.getDomainChangesCount(baseDn); |
| | | return changelogDB.getDomainChangesCount(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Get the baseDn. |
| | | * @return Returns the baseDn. |
| | | * Get the baseDN. |
| | | * |
| | | * @return Returns the baseDN. |
| | | */ |
| | | public String getBaseDn() |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDn; |
| | | return baseDN; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( |
| | | this.baseDn, Integer.toString(msg.getDestination()))); |
| | | baseDN.toNormalizedString(), Integer.toString(msg.getDestination()))); |
| | | mb.append(" In Replication Server=").append( |
| | | this.localReplicationServer.getMonitorInstanceName()); |
| | | mb.append(" unroutable message =").append(msg.getClass().getSimpleName()); |
| | |
| | | */ |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( |
| | | this.baseDn, Integer.toString(msg.getDestination()))); |
| | | baseDN.toNormalizedString(), |
| | | Integer.toString(msg.getDestination()))); |
| | | mb.append(" unroutable message =" + msg.getClass().getSimpleName()); |
| | | mb.append(" Details: " + ioe.getLocalizedMessage()); |
| | | final Message message = mb.toMessage(); |
| | |
| | | |
| | | stopAllServers(true); |
| | | |
| | | changelogDB.shutdownDomain(baseDn); |
| | | changelogDB.shutdownDomain(baseDN); |
| | | } |
| | | |
| | | /** |
| | |
| | | public ServerState getDbServerState() |
| | | { |
| | | ServerState serverState = new ServerState(); |
| | | for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values()) |
| | | for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDN).values()) |
| | | { |
| | | serverState.update(lastCSN); |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "ReplicationServerDomain " + baseDn; |
| | | return "ReplicationServerDomain " + baseDN; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | if (i == 2) |
| | | { |
| | | Message message = |
| | | ERR_EXCEPTION_SENDING_TOPO_INFO |
| | | .get(baseDn, "directory", Integer.toString(dsHandler |
| | | .getServerId()), e.getMessage()); |
| | | Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get( |
| | | baseDN.toNormalizedString(), "directory", |
| | | Integer.toString(dsHandler.getServerId()), e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | if (i == 2) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get( |
| | | baseDn, "replication", |
| | | baseDN.toNormalizedString(), "replication", |
| | | Integer.toString(rsHandler.getServerId()), e.getMessage()); |
| | | logError(message); |
| | | } |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + this + " Receiving ResetGenerationIdMsg from " |
| | | + senderHandler.getServerId() + " for baseDn " + baseDn + ":\n" |
| | | + genIdMsg); |
| | | debug("Receiving ResetGenerationIdMsg from " |
| | | + senderHandler.getServerId() + ":\n" + genIdMsg); |
| | | } |
| | | |
| | | try |
| | |
| | | // Order to take a gen id we already have, just ignore |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + this |
| | | + " Reset generation id requested for baseDn " + baseDn |
| | | + " but generation id was already " + this.generationId + ":\n" |
| | | + genIdMsg); |
| | | debug("Reset generation id requested but generationId was already " |
| | | + this.generationId + ":\n" + genIdMsg); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | } catch (IOException e) |
| | | { |
| | | logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn, |
| | | e.getMessage())); |
| | | logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get( |
| | | baseDN.toNormalizedString(), e.getMessage())); |
| | | } |
| | | } |
| | | |
| | |
| | | dsHandler.changeStatusForResetGenId(newGenId); |
| | | } catch (IOException e) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn, |
| | | logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get( |
| | | baseDN.toNormalizedString(), |
| | | Integer.toString(dsHandler.getServerId()), |
| | | e.getMessage())); |
| | | } |
| | |
| | | // treatment. |
| | | sendTopoInfoToAll(); |
| | | |
| | | logError(NOTE_RESET_GENERATION_ID.get(baseDn, newGenId)); |
| | | logError(NOTE_RESET_GENERATION_ID.get(baseDN.toNormalizedString(), |
| | | newGenId)); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | sendTopoInfoToAllExcept(senderHandler); |
| | | |
| | | Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get( |
| | | senderHandler.getServerId(), baseDn, newStatus.toString()); |
| | | senderHandler.getServerId(), baseDN.toNormalizedString(), |
| | | newStatus.toString()); |
| | | logError(message); |
| | | } |
| | | catch(Exception e) |
| | |
| | | // StatusAnalyzer. |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Status analyzer for domain " + baseDn |
| | | TRACER.debugInfo("Status analyzer for domain " + baseDN |
| | | + " has been interrupted when" |
| | | + " trying to acquire domain lock for changing the status of DS " |
| | | + dsHandler.getServerId()); |
| | |
| | | catch (IOException e) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER |
| | | .get(baseDn, |
| | | .get(baseDN.toNormalizedString(), |
| | | Integer.toString(dsHandler.getServerId()), |
| | | e.getMessage())); |
| | | } |
| | |
| | | public void clearDbs() |
| | | { |
| | | // Reset the localchange and state db for the current domain |
| | | changelogDB.clearDomain(baseDn); |
| | | changelogDB.clearDomain(baseDN); |
| | | try |
| | | { |
| | | localReplicationServer.clearGenerationId(baseDn); |
| | | localReplicationServer.clearGenerationId(baseDN); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | rsHandler.getServerId(), |
| | | rsHandler.session.getReadableRemoteAddress(), |
| | | rsHandler.getGenerationId(), |
| | | baseDn, getLocalRSServerId(), generationId); |
| | | baseDN.toNormalizedString(), getLocalRSServerId(), generationId); |
| | | logError(message); |
| | | |
| | | ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(), |
| | |
| | | { |
| | | return "Replication server RS(" + localReplicationServer.getServerId() |
| | | + ") " + localReplicationServer.getServerURL() + ",cn=" |
| | | + baseDn.replace(',', '_').replace('=', '_') + ",cn=Replication"; |
| | | + baseDN.toNormalizedString().replace(',', '_').replace('=', '_') |
| | | + ",cn=Replication"; |
| | | } |
| | | |
| | | /** |
| | |
| | | String.valueOf(localReplicationServer.getServerId()))); |
| | | attributes.add(Attributes.create("replication-server-port", |
| | | String.valueOf(localReplicationServer.getReplicationPort()))); |
| | | attributes.add(Attributes.create("domain-name", baseDn)); |
| | | attributes.add(Attributes.create("domain-name", |
| | | baseDN.toNormalizedString())); |
| | | attributes.add(Attributes.create("generation-id", |
| | | baseDn + " " + generationId)); |
| | | baseDN + " " + generationId)); |
| | | |
| | | // Missing changes |
| | | long missingChanges = getDomainMonitorData().getMissingChangesRS( |
| | |
| | | if (eligibleCSN.olderOrEqual(mostRecentDbCSN)) |
| | | { |
| | | // let's try to seek the first change <= eligibleCSN |
| | | CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN); |
| | | CSN newCSN = changelogDB.getCSNAfter(baseDN, serverId, eligibleCSN); |
| | | result.update(newCSN); |
| | | } else { |
| | | // for this serverId, all changes in the ChangelogDb are holder |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER |
| | | .debugInfo("In " + this + " getEligibleState() result is " + result); |
| | | debug("getEligibleState() result is " + result); |
| | | } |
| | | return result; |
| | | } |
| | |
| | | public ServerState getStartState() |
| | | { |
| | | ServerState domainStartState = new ServerState(); |
| | | for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values()) |
| | | for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDN).values()) |
| | | { |
| | | domainStartState.update(firstCSN); |
| | | } |
| | |
| | | CSN eligibleCSN = null; |
| | | |
| | | for (Entry<Integer, CSN> entry : |
| | | changelogDB.getDomainLastCSNs(baseDn).entrySet()) |
| | | changelogDB.getDomainLastCSNs(baseDN).entrySet()) |
| | | { |
| | | // Consider this producer (DS/db). |
| | | final int serverId = entry.getKey(); |
| | |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG |
| | | .get("Replication Server " |
| | | + localReplicationServer.getReplicationPort() + " " |
| | | + baseDn + " " + localReplicationServer.getServerId())); |
| | | + baseDN + " " + localReplicationServer.getServerId())); |
| | | stopServer(rsHandler, false); |
| | | } |
| | | } |
| | |
| | | */ |
| | | public long getLatestDomainTrimDate() |
| | | { |
| | | return changelogDB.getDomainLatestTrimDate(baseDn); |
| | | return changelogDB.getDomainLatestTrimDate(baseDN); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | private void debug(String message) |
| | | { |
| | | TRACER.debugInfo("In RS serverId=" + localReplicationServer.getServerId() |
| | | + " for baseDn=" + baseDn + " and port=" |
| | | + localReplicationServer.getReplicationPort() + ": " + message); |
| | | TRACER.debugInfo("In ReplicationServerDomain serverId=" |
| | | + localReplicationServer.getServerId() + " for baseDN=" + baseDN |
| | | + " and port=" + localReplicationServer.getReplicationPort() |
| | | + ": " + message); |
| | | } |
| | | } |
| | |
| | | serverURL = inReplServerStartMsg.getServerURL(); |
| | | final String port = serverURL.substring(serverURL.lastIndexOf(':') + 1); |
| | | serverAddressURL = session.getRemoteAddress() + ":" + port; |
| | | setBaseDNAndDomain(inReplServerStartMsg.getBaseDn(), false); |
| | | DN baseDN = DN.decode(inReplServerStartMsg.getBaseDn()); |
| | | setBaseDNAndDomain(baseDN, false); |
| | | setInitialServerState(inReplServerStartMsg.getServerState()); |
| | | setSendWindowSize(inReplServerStartMsg.getWindowSize()); |
| | | if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1) |
| | |
| | | * @param sslEncryption The sslEncryption requested to the remote RS. |
| | | * @throws DirectoryException when an error occurs. |
| | | */ |
| | | public void connect(String baseDN, boolean sslEncryption) |
| | | public void connect(DN baseDN, boolean sslEncryption) |
| | | throws DirectoryException |
| | | { |
| | | // we are the initiator and decides of the encryption |
| | |
| | | |
| | | Message message = INFO_REPLICATION_SERVER_CONNECTION_TO_RS |
| | | .get(getReplicationServerId(), getServerId(), |
| | | replicationServerDomain.getBaseDn(), |
| | | replicationServerDomain.getBaseDN().toNormalizedString(), |
| | | session.getReadableRemoteAddress()); |
| | | logError(message); |
| | | |
| | |
| | | |
| | | Message message = INFO_REPLICATION_SERVER_CONNECTION_FROM_RS |
| | | .get(getReplicationServerId(), getServerId(), |
| | | replicationServerDomain.getBaseDn(), |
| | | replicationServerDomain.getBaseDN().toNormalizedString(), |
| | | session.getReadableRemoteAddress()); |
| | | logError(message); |
| | | |
| | |
| | | if (serverId != 0) |
| | | { |
| | | return "Replication server RS(" + serverId + ") for domain \"" |
| | | + replicationServerDomain.getBaseDn() + "\""; |
| | | + replicationServerDomain.getBaseDN() + "\""; |
| | | } |
| | | return "Unknown server"; |
| | | } |
| | |
| | | if (replicationServerDomain != null) |
| | | replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true); |
| | | } |
| | | |
| | | } |
| | |
| | | { |
| | | super("Replication server RS(" |
| | | + replicationServerDomain.getLocalRSServerId() |
| | | + ") delay monitor for domain \"" + replicationServerDomain.getBaseDn() |
| | | + ") delay monitor for domain \"" + replicationServerDomain.getBaseDN() |
| | | + "\""); |
| | | |
| | | this.replicationServerDomain = replicationServerDomain; |
| | |
| | | private String getMessage(String message) |
| | | { |
| | | return "In RS " + replicationServerDomain.getLocalRSServerId() |
| | | + ", for base dn " + replicationServerDomain.getBaseDn() + ": " |
| | | + ", for baseDN=" + replicationServerDomain.getBaseDN() + ": " |
| | | + message; |
| | | } |
| | | |
| | |
| | | package org.opends.server.replication.server.changelog.api; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * The Change Number Index Data class represents records stored in the |
| | |
| | | /** This is the key used to store the rest of the . */ |
| | | private long changeNumber; |
| | | private String previousCookie; |
| | | private String baseDN; |
| | | private DN baseDN; |
| | | private CSN csn; |
| | | |
| | | /** |
| | |
| | | * @param csn |
| | | * the replication CSN field |
| | | */ |
| | | public CNIndexRecord(long changeNumber, String previousCookie, String baseDN, |
| | | public CNIndexRecord(long changeNumber, String previousCookie, DN baseDN, |
| | | CSN csn) |
| | | { |
| | | super(); |
| | |
| | | * |
| | | * @return the baseDN |
| | | */ |
| | | public String getBaseDN() |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDN; |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.api; |
| | | |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * This class stores an index of all the changes seen by this server in the form |
| | | * of {@link CNIndexRecord}s. The records are sorted by a global ordering as |
| | |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | void clear(String baseDNToClear) throws ChangelogException; |
| | | void clear(DN baseDNToClear) throws ChangelogException; |
| | | |
| | | /** |
| | | * Shutdown this DB. |
| | |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | | * The changelogDB stores the replication data on persistent storage. |
| | |
| | | * Returns the serverIds for the servers that are or have been part of the |
| | | * provided replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain baseDn |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @return a set of integers holding the serverIds |
| | | */ |
| | | Set<Integer> getDomainServerIds(String baseDn); |
| | | Set<Integer> getDomainServerIds(DN baseDN); |
| | | |
| | | /** |
| | | * Get the number of changes for the specified replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain baseDn |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @return the number of changes. |
| | | */ |
| | | long getDomainChangesCount(String baseDn); |
| | | long getDomainChangesCount(DN baseDN); |
| | | |
| | | /** |
| | | * Returns the FIRST {@link CSN}s of each serverId for the specified |
| | | * replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain baseDn |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @return a {serverId => FIRST CSN} Map |
| | | */ |
| | | Map<Integer, CSN> getDomainFirstCSNs(String baseDn); |
| | | Map<Integer, CSN> getDomainFirstCSNs(DN baseDN); |
| | | |
| | | /** |
| | | * Returns the LAST {@link CSN}s of each serverId for the specified |
| | | * replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain baseDn |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @return a {serverId => LAST CSN} Map |
| | | */ |
| | | Map<Integer, CSN> getDomainLastCSNs(String baseDn); |
| | | Map<Integer, CSN> getDomainLastCSNs(DN baseDN); |
| | | |
| | | /** |
| | | * Retrieves the latest trim date for the specified replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain baseDn |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @return the domain latest trim date |
| | | */ |
| | | long getDomainLatestTrimDate(String baseDn); |
| | | long getDomainLatestTrimDate(DN baseDN); |
| | | |
| | | /** |
| | | * Shutdown the specified replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain baseDn |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | */ |
| | | void shutdownDomain(String baseDn); |
| | | void shutdownDomain(DN baseDN); |
| | | |
| | | /** |
| | | * Clear DB and shutdown for the specified replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain baseDn |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | */ |
| | | void clearDomain(String baseDn); |
| | | void clearDomain(DN baseDN); |
| | | |
| | | // serverId methods |
| | | |
| | |
| | | * Return the number of changes between 2 provided {@link CSN}s for the |
| | | * specified serverId and replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain baseDn |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @param serverId |
| | | * the serverId on which to act |
| | | * @param from |
| | |
| | | * The upper (newer) CSN |
| | | * @return The computed number of changes |
| | | */ |
| | | long getCount(String baseDn, int serverId, CSN from, CSN to); |
| | | long getCount(DN baseDN, int serverId, CSN from, CSN to); |
| | | |
| | | /** |
| | | * Returns the {@link CSN} situated immediately after the specified |
| | | * {@link CSN} for the specified serverId and replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain baseDn |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @param serverId |
| | | * the serverId for which we want the information |
| | | * @param startAfterCSN |
| | |
| | | * @return a new ReplicationIterator that allows to browse the db managed by |
| | | * this dbHandler and starting at the position defined by a given CSN. |
| | | */ |
| | | CSN getCSNAfter(String baseDn, int serverId, CSN startAfterCSN); |
| | | CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN); |
| | | |
| | | /** |
| | | * Generates a non empty {@link ReplicaDBCursor} for the specified serverId |
| | | * and replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain baseDn |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @param serverId |
| | | * the serverId on which to act |
| | | * @param startAfterCSN |
| | |
| | | * @return a {@link ReplicaDBCursor} if the ReplicaDB is not empty, null |
| | | * otherwise |
| | | */ |
| | | ReplicaDBCursor getCursorFrom(String baseDn, int serverId, CSN startAfterCSN); |
| | | ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN); |
| | | |
| | | /** |
| | | * for the specified serverId and replication domain. |
| | | * |
| | | * @param baseDn |
| | | * the replication domain baseDn |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @param serverId |
| | | * the serverId on which to act |
| | | * @param updateMsg |
| | |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | boolean publishUpdateMsg(String baseDn, int serverId, UpdateMsg updateMsg) |
| | | boolean publishUpdateMsg(DN baseDN, int serverId, UpdateMsg updateMsg) |
| | | throws ChangelogException; |
| | | |
| | | } |
| | |
| | | import org.opends.server.replication.server.changelog.je.ReplicationDB.*; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | |
| | | private CSN firstChange; |
| | | private CSN lastChange; |
| | | private int serverId; |
| | | private String baseDn; |
| | | private DN baseDN; |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private DirectoryThread thread; |
| | | private final Object flushLock = new Object(); |
| | |
| | | * Creates a new dbHandler associated to a given LDAP server. |
| | | * |
| | | * @param id Identifier of the DB. |
| | | * @param baseDn the baseDn for which this DB was created. |
| | | * @param baseDN the baseDN for which this DB was created. |
| | | * @param replicationServer The ReplicationServer that creates this dbHandler. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * server for this domain. |
| | | * @param queueSize The queueSize to use when creating the dbHandler. |
| | | * @throws ChangelogException If a database problem happened |
| | | */ |
| | | public DbHandler( |
| | | int id, String baseDn, ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv, int queueSize) |
| | | throws ChangelogException |
| | | public DbHandler(int id, DN baseDN, ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv, int queueSize) throws ChangelogException |
| | | { |
| | | this.replicationServer = replicationServer; |
| | | serverId = id; |
| | | this.baseDn = baseDn; |
| | | this.baseDN = baseDN; |
| | | trimAge = replicationServer.getTrimAge(); |
| | | queueMaxSize = queueSize; |
| | | queueLowmark = queueSize / 5; |
| | |
| | | queueMaxBytes = 200 * queueMaxSize; |
| | | queueLowmarkBytes = 200 * queueLowmark; |
| | | queueHimarkBytes = 200 * queueLowmark; |
| | | db = new ReplicationDB(id, baseDn, replicationServer, dbenv); |
| | | db = new ReplicationDB(id, baseDN, replicationServer, dbenv); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | thread = new DirectoryThread(this, "Replication server RS(" |
| | | + replicationServer.getServerId() |
| | | + ") changelog checkpointer for Replica DS(" + id |
| | | + ") for domain \"" + baseDn + "\""); |
| | | + ") for domain \"" + baseDN + "\""); |
| | | thread.start(); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(Attributes.create("replicationServer-database", |
| | | String.valueOf(serverId))); |
| | | attributes.add(Attributes.create("domain-name", baseDn)); |
| | | attributes.add(Attributes.create("domain-name", |
| | | baseDN.toNormalizedString())); |
| | | if (firstChange != null) |
| | | { |
| | | attributes.add(Attributes.create("first-change", encode(firstChange))); |
| | |
| | | public String getMonitorInstanceName() |
| | | { |
| | | ReplicationServerDomain domain = replicationServer |
| | | .getReplicationServerDomain(baseDn); |
| | | .getReplicationServerDomain(baseDN); |
| | | return "Changelog for DS(" + serverId + "),cn=" |
| | | + domain.getMonitorInstanceName(); |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return baseDn + " " + serverId + " " + firstChange + " " + lastChange; |
| | | return baseDN + " " + serverId + " " + firstChange + " " + lastChange; |
| | | } |
| | | |
| | | /** |
| | |
| | | final long changeNumber = record.getChangeNumber(); |
| | | DatabaseEntry key = new ReplicationDraftCNKey(changeNumber); |
| | | DatabaseEntry data = new DraftCNData(changeNumber, |
| | | record.getPreviousCookie(), record.getBaseDN(), record.getCSN()); |
| | | record.getPreviousCookie(), record.getBaseDN().toNormalizedString(), |
| | | record.getCSN()); |
| | | |
| | | // Use a transaction so that we can override durability. |
| | | Transaction txn = null; |
| | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.changelog.api.CNIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | |
| | | import com.sleepycat.je.DatabaseEntry; |
| | | |
| | |
| | | { |
| | | String stringData = new String(data, "UTF-8"); |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | return new CNIndexRecord(changeNumber, str[0], str[1], new CSN(str[2])); |
| | | final DN baseDN = DN.decode(str[1]); |
| | | return new CNIndexRecord(changeNumber, str[0], baseDN, new CSN(str[2])); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | |
| | | // TODO: i18n |
| | | throw new ChangelogException(Message.raw("need UTF-8 support")); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | throw new ChangelogException(e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.je.DraftCNDB.*; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.types.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void clear(String baseDNToClear) throws ChangelogException |
| | | public void clear(DN baseDNToClear) throws ChangelogException |
| | | { |
| | | if (isEmpty()) |
| | | { |
| | |
| | | |
| | | // From the draftCNDb change record, get the domain and CSN |
| | | final CNIndexRecord record = cursor.currentRecord(); |
| | | final String baseDN = record.getBaseDN(); |
| | | if (baseDNToClear != null && baseDNToClear.equalsIgnoreCase(baseDN)) |
| | | if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN())) |
| | | { |
| | | cursor.delete(); |
| | | continue; |
| | | } |
| | | |
| | | final ReplicationServerDomain domain = |
| | | replicationServer.getReplicationServerDomain(baseDN); |
| | | replicationServer.getReplicationServerDomain(record.getBaseDN()); |
| | | if (domain == null) |
| | | { |
| | | // the domain has been removed since the record was written in the |
| | |
| | | ServerState csnVector; |
| | | try |
| | | { |
| | | Map<String, ServerState> csnStartStates = |
| | | Map<DN, ServerState> csnStartStates = |
| | | MultiDomainServerState.splitGenStateToServerStates( |
| | | record.getPreviousCookie()); |
| | | csnVector = csnStartStates.get(baseDN); |
| | | csnVector = csnStartStates.get(record.getBaseDN()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("DraftCNDBHandler:clear() - ChangeVector:" |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.Pair; |
| | | |
| | |
| | | /** |
| | | * This map contains the List of updates received from each LDAP server. |
| | | */ |
| | | private final Map<String, Map<Integer, DbHandler>> sourceDbHandlers = |
| | | new ConcurrentHashMap<String, Map<Integer, DbHandler>>(); |
| | | private final Map<DN, Map<Integer, DbHandler>> sourceDbHandlers = |
| | | new ConcurrentHashMap<DN, Map<Integer, DbHandler>>(); |
| | | private ReplicationDbEnv dbEnv; |
| | | private String dbDirName = null; |
| | | private File dbDirectory; |
| | |
| | | this.replicationServer = replicationServer; |
| | | } |
| | | |
| | | private Map<Integer, DbHandler> getDomainMap(String baseDn) |
| | | private Map<Integer, DbHandler> getDomainMap(DN baseDN) |
| | | { |
| | | final Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDn); |
| | | final Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDN); |
| | | if (domainMap != null) |
| | | { |
| | | return domainMap; |
| | |
| | | return Collections.emptyMap(); |
| | | } |
| | | |
| | | private DbHandler getDbHandler(String baseDn, int serverId) |
| | | private DbHandler getDbHandler(DN baseDN, int serverId) |
| | | { |
| | | return getDomainMap(baseDn).get(serverId); |
| | | return getDomainMap(baseDN).get(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Provision resources for the specified serverId in the specified replication |
| | | * domain. |
| | | * |
| | | * @param baseDn |
| | | * @param baseDN |
| | | * the replication domain where to add the serverId |
| | | * @param serverId |
| | | * the server Id to add to the replication domain |
| | | * @throws ChangelogException |
| | | * If a database error happened. |
| | | */ |
| | | private void commission(String baseDn, int serverId, ReplicationServer rs) |
| | | private void commission(DN baseDN, int serverId, ReplicationServer rs) |
| | | throws ChangelogException |
| | | { |
| | | getOrCreateDbHandler(baseDn, serverId, rs); |
| | | getOrCreateDbHandler(baseDN, serverId, rs); |
| | | } |
| | | |
| | | private Pair<DbHandler, Boolean> getOrCreateDbHandler(String baseDn, |
| | | private Pair<DbHandler, Boolean> getOrCreateDbHandler(DN baseDN, |
| | | int serverId, ReplicationServer rs) throws ChangelogException |
| | | { |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDn); |
| | | Map<Integer, DbHandler> domainMap = sourceDbHandlers.get(baseDN); |
| | | if (domainMap == null) |
| | | { |
| | | domainMap = new ConcurrentHashMap<Integer, DbHandler>(); |
| | | sourceDbHandlers.put(baseDn, domainMap); |
| | | sourceDbHandlers.put(baseDN, domainMap); |
| | | } |
| | | |
| | | DbHandler dbHandler = domainMap.get(serverId); |
| | | if (dbHandler == null) |
| | | { |
| | | dbHandler = |
| | | new DbHandler(serverId, baseDn, rs, dbEnv, rs.getQueueSize()); |
| | | new DbHandler(serverId, baseDN, rs, dbEnv, rs.getQueueSize()); |
| | | domainMap.put(serverId, dbHandler); |
| | | return Pair.of(dbHandler, true); |
| | | } |
| | |
| | | private void initializeChangelogState(final ChangelogState changelogState) |
| | | throws ChangelogException |
| | | { |
| | | for (Map.Entry<String, Long> entry : |
| | | for (Map.Entry<DN, Long> entry : |
| | | changelogState.getDomainToGenerationId().entrySet()) |
| | | { |
| | | replicationServer.getReplicationServerDomain(entry.getKey(), true) |
| | | .initGenerationID(entry.getValue()); |
| | | } |
| | | for (Map.Entry<String, List<Integer>> entry : changelogState |
| | | .getDomainToServerIds().entrySet()) |
| | | for (Map.Entry<DN, List<Integer>> entry : |
| | | changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | final String baseDn = entry.getKey(); |
| | | for (int serverId : entry.getValue()) |
| | | { |
| | | commission(baseDn, serverId, replicationServer); |
| | | commission(entry.getKey(), serverId, replicationServer); |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Set<Integer> getDomainServerIds(String baseDn) |
| | | public Set<Integer> getDomainServerIds(DN baseDN) |
| | | { |
| | | return getDomainMap(baseDn).keySet(); |
| | | return getDomainMap(baseDN).keySet(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getCount(String baseDn, int serverId, CSN from, CSN to) |
| | | public long getCount(DN baseDN, int serverId, CSN from, CSN to) |
| | | { |
| | | DbHandler dbHandler = getDbHandler(baseDn, serverId); |
| | | DbHandler dbHandler = getDbHandler(baseDN, serverId); |
| | | if (dbHandler != null) |
| | | { |
| | | return dbHandler.getCount(from, to); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getDomainChangesCount(String baseDn) |
| | | public long getDomainChangesCount(DN baseDN) |
| | | { |
| | | long entryCount = 0; |
| | | for (DbHandler dbHandler : getDomainMap(baseDn).values()) |
| | | for (DbHandler dbHandler : getDomainMap(baseDN).values()) |
| | | { |
| | | entryCount += dbHandler.getChangesCount(); |
| | | } |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void shutdownDomain(String baseDn) |
| | | public void shutdownDomain(DN baseDN) |
| | | { |
| | | shutdownDbHandlers(getDomainMap(baseDn)); |
| | | shutdownDbHandlers(getDomainMap(baseDN)); |
| | | } |
| | | |
| | | private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap) |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Map<Integer, CSN> getDomainFirstCSNs(String baseDn) |
| | | public Map<Integer, CSN> getDomainFirstCSNs(DN baseDN) |
| | | { |
| | | final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn); |
| | | final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN); |
| | | final Map<Integer, CSN> results = |
| | | new HashMap<Integer, CSN>(domainMap.size()); |
| | | for (DbHandler dbHandler : domainMap.values()) |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Map<Integer, CSN> getDomainLastCSNs(String baseDn) |
| | | public Map<Integer, CSN> getDomainLastCSNs(DN baseDN) |
| | | { |
| | | final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn); |
| | | final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN); |
| | | final Map<Integer, CSN> results = |
| | | new HashMap<Integer, CSN>(domainMap.size()); |
| | | for (DbHandler dbHandler : domainMap.values()) |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void clearDomain(String baseDn) |
| | | public void clearDomain(DN baseDN) |
| | | { |
| | | final Map<Integer, DbHandler> domainMap = getDomainMap(baseDn); |
| | | final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN); |
| | | synchronized (domainMap) |
| | | { |
| | | for (DbHandler dbHandler : domainMap.values()) |
| | |
| | | |
| | | try |
| | | { |
| | | dbEnv.clearGenerationId(baseDn); |
| | | dbEnv.clearGenerationId(baseDN); |
| | | } |
| | | catch (Exception ignored) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getDomainLatestTrimDate(String baseDn) |
| | | public long getDomainLatestTrimDate(DN baseDN) |
| | | { |
| | | long latest = 0; |
| | | for (DbHandler dbHandler : getDomainMap(baseDn).values()) |
| | | for (DbHandler dbHandler : getDomainMap(baseDN).values()) |
| | | { |
| | | if (latest == 0 || latest < dbHandler.getLatestTrimDate()) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CSN getCSNAfter(String baseDn, int serverId, CSN startAfterCSN) |
| | | public CSN getCSNAfter(DN baseDN, int serverId, CSN startAfterCSN) |
| | | { |
| | | final DbHandler dbHandler = getDbHandler(baseDn, serverId); |
| | | final DbHandler dbHandler = getDbHandler(baseDN, serverId); |
| | | |
| | | ReplicaDBCursor cursor = null; |
| | | try |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ReplicaDBCursor getCursorFrom(String baseDn, int serverId, |
| | | public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, |
| | | CSN startAfterCSN) |
| | | { |
| | | DbHandler dbHandler = getDbHandler(baseDn, serverId); |
| | | DbHandler dbHandler = getDbHandler(baseDN, serverId); |
| | | if (dbHandler == null) |
| | | { |
| | | return null; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean publishUpdateMsg(String baseDn, int serverId, |
| | | public boolean publishUpdateMsg(DN baseDN, int serverId, |
| | | UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | final Pair<DbHandler, Boolean> pair = |
| | | getOrCreateDbHandler(baseDn, serverId, replicationServer); |
| | | getOrCreateDbHandler(baseDN, serverId, replicationServer); |
| | | final DbHandler dbHandler = pair.getFirst(); |
| | | final boolean wasCreated = pair.getSecond(); |
| | | |
| | |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.sleepycat.je.*; |
| | |
| | | private ReplicationDbEnv dbenv; |
| | | private ReplicationServer replicationServer; |
| | | private int serverId; |
| | | private String baseDn; |
| | | private DN baseDN; |
| | | |
| | | /** |
| | | * The lock used to provide exclusive access to the thread that close the db |
| | |
| | | * Creates a new database or open existing database that will be used |
| | | * to store and retrieve changes from an LDAP server. |
| | | * @param serverId The identifier of the LDAP server. |
| | | * @param baseDn The baseDn of the replication domain. |
| | | * @param baseDN The baseDN of the replication domain. |
| | | * @param replicationServer The ReplicationServer that needs to be shutdown. |
| | | * @param dbenv The Db environment to use to create the db. |
| | | * @throws ChangelogException If a database problem happened. |
| | | */ |
| | | public ReplicationDB(int serverId, String baseDn, |
| | | ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv) |
| | | public ReplicationDB(int serverId, DN baseDN, |
| | | ReplicationServer replicationServer, ReplicationDbEnv dbenv) |
| | | throws ChangelogException |
| | | { |
| | | this.serverId = serverId; |
| | | this.baseDn = baseDn; |
| | | this.baseDN = baseDN; |
| | | this.dbenv = dbenv; |
| | | this.replicationServer = replicationServer; |
| | | |
| | | // Get or create the associated ReplicationServerDomain and Db. |
| | | final ReplicationServerDomain domain = |
| | | replicationServer.getReplicationServerDomain(baseDn, true); |
| | | db = dbenv.getOrAddDb(serverId, baseDn, domain.getGenerationId()); |
| | | replicationServer.getReplicationServerDomain(baseDN, true); |
| | | db = dbenv.getOrAddDb(serverId, baseDN, domain.getGenerationId()); |
| | | |
| | | |
| | | intializeCounters(); |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return serverId + baseDn; |
| | | return serverId + " " + baseDN.toNormalizedString(); |
| | | } |
| | | |
| | | /** |
| | |
| | | String dbName = db.getDatabaseName(); |
| | | |
| | | // Clears the reference to this serverID |
| | | dbenv.clearServerId(baseDn, serverId); |
| | | dbenv.clearServerId(baseDN, serverId); |
| | | |
| | | // Closing is requested by the Berkeley DB before truncate |
| | | db.close(); |
| | |
| | | dbenv.clearDb(dbName); |
| | | |
| | | // RE-create the db |
| | | db = dbenv.getOrAddDb(serverId, baseDn, -1); |
| | | db = dbenv.getOrAddDb(serverId, baseDN, -1); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | |
| | | import com.sleepycat.je.*; |
| | | |
| | |
| | | final String stringData = toString(data.getData()); |
| | | |
| | | if (debugEnabled()) |
| | | debug("read (" + GENERATION_ID_TAG + " generationId baseDn) OR " |
| | | debug("read (" + GENERATION_ID_TAG + " generationId baseDN) OR " |
| | | + "(serverId baseDN): " + stringData); |
| | | |
| | | final String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | if (str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | long generationId = toLong(str[1]); |
| | | String baseDn = str[2]; |
| | | DN baseDN = DN.decode(str[2]); |
| | | |
| | | if (debugEnabled()) |
| | | debug("has read baseDn=" + baseDn + " generationId=" +generationId); |
| | | debug("has read baseDN=" + baseDN + " generationId=" +generationId); |
| | | |
| | | result.setDomainGenerationId(baseDn, generationId); |
| | | result.setDomainGenerationId(baseDN, generationId); |
| | | } |
| | | else |
| | | { |
| | | int serverId = toInt(str[0]); |
| | | String baseDn = str[1]; |
| | | DN baseDN = DN.decode(str[1]); |
| | | |
| | | if (debugEnabled()) |
| | | debug("has read: baseDn=" + baseDn + " serverId=" + serverId); |
| | | debug("has read: baseDN=" + baseDN + " serverId=" + serverId); |
| | | |
| | | result.addServerIdToDomain(serverId, baseDn); |
| | | result.addServerIdToDomain(serverId, baseDN); |
| | | } |
| | | |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | |
| | | { |
| | | throw new ChangelogException(e); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | throw new ChangelogException(e); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | |
| | | |
| | | /** |
| | | * Finds or creates the database used to store changes from the server with |
| | | * the given serverId and the given baseDn. |
| | | * the given serverId and the given baseDN. |
| | | * |
| | | * @param serverId |
| | | * The server id that identifies the server. |
| | | * @param baseDn |
| | | * The baseDn that identifies the domain. |
| | | * @param baseDN |
| | | * The baseDN that identifies the domain. |
| | | * @param generationId |
| | | * The generationId associated to this domain. |
| | | * @return the Database. |
| | | * @throws ChangelogException |
| | | * in case of underlying Exception. |
| | | */ |
| | | public Database getOrAddDb(int serverId, String baseDn, long generationId) |
| | | public Database getOrAddDb(int serverId, DN baseDN, long generationId) |
| | | throws ChangelogException |
| | | { |
| | | if (debugEnabled()) |
| | | debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDn + ", " |
| | | debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDN + ", " |
| | | + generationId + ")"); |
| | | try |
| | | { |
| | | // JNR: redundant info is stored between the key and data down below. |
| | | // It is probably ok since "changelogstate" DB does not receive a high |
| | | // volume of inserts. |
| | | final String serverIdToBaseDn = buildServerIdKey(baseDn, serverId); |
| | | final String serverIdToBaseDn = buildServerIdKey(baseDN, serverId); |
| | | |
| | | // Opens the DB for the changes received from this server on this domain. |
| | | Database db = openDatabase(serverIdToBaseDn); |
| | | |
| | | putInChangelogStateDBIfNotExist(serverIdToBaseDn, serverIdToBaseDn); |
| | | putInChangelogStateDBIfNotExist(buildGenIdKey(baseDn), |
| | | buildGenIdData(baseDn, generationId)); |
| | | putInChangelogStateDBIfNotExist(buildGenIdKey(baseDN), |
| | | buildGenIdData(baseDN, generationId)); |
| | | return db; |
| | | } |
| | | catch (RuntimeException e) |
| | |
| | | } |
| | | } |
| | | |
| | | private String buildGenIdKey(String baseDn) |
| | | private String buildGenIdKey(DN baseDN) |
| | | { |
| | | return GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn; |
| | | return GENERATION_ID_TAG + FIELD_SEPARATOR + baseDN.toNormalizedString(); |
| | | } |
| | | |
| | | private String buildServerIdKey(String baseDn, int serverId) |
| | | private String buildServerIdKey(DN baseDN, int serverId) |
| | | { |
| | | return serverId + FIELD_SEPARATOR + baseDn; |
| | | return serverId + FIELD_SEPARATOR + baseDN.toNormalizedString(); |
| | | } |
| | | |
| | | private String buildGenIdData(String baseDn, long generationId) |
| | | private String buildGenIdData(DN baseDN, long generationId) |
| | | { |
| | | return GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR |
| | | + baseDn; |
| | | + baseDN.toNormalizedString(); |
| | | } |
| | | |
| | | private void putInChangelogStateDBIfNotExist(String keyString, |
| | |
| | | } |
| | | |
| | | /** |
| | | * Clears the provided generationId associated to the provided baseDn from the |
| | | * Clears the provided generationId associated to the provided baseDN from the |
| | | * state Db. |
| | | * |
| | | * @param baseDn |
| | | * The baseDn for which the generationID must be cleared. |
| | | * @param baseDN |
| | | * The baseDN for which the generationID must be cleared. |
| | | */ |
| | | public void clearGenerationId(String baseDn) |
| | | public void clearGenerationId(DN baseDN) |
| | | { |
| | | deleteFromChangelogStateDB(buildGenIdKey(baseDn), |
| | | "clearGenerationId(baseDN=" + baseDn + ")"); |
| | | deleteFromChangelogStateDB(buildGenIdKey(baseDN), |
| | | "clearGenerationId(baseDN=" + baseDN + ")"); |
| | | } |
| | | |
| | | /** |
| | | * Clears the provided serverId associated to the provided baseDn from the |
| | | * Clears the provided serverId associated to the provided baseDN from the |
| | | * state Db. |
| | | * |
| | | * @param baseDn |
| | | * The baseDn for which the generationID must be cleared. |
| | | * @param baseDN |
| | | * The baseDN for which the serverId must be cleared. |
| | | * @param serverId |
| | | * The serverId to remove from the Db. |
| | | */ |
| | | public void clearServerId(String baseDn, int serverId) |
| | | public void clearServerId(DN baseDN, int serverId) |
| | | { |
| | | deleteFromChangelogStateDB(buildServerIdKey(baseDn, serverId), |
| | | "clearServerId(baseDN=" + baseDn + " , serverId=" + serverId + ")"); |
| | | deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId), |
| | | "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); |
| | | } |
| | | |
| | | private void deleteFromChangelogStateDB(String keyString, |
| | |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | |
| | | private volatile String replicationServer = NO_CONNECTED_SERVER; |
| | | private volatile Session session = null; |
| | | private final ServerState state; |
| | | private final String baseDn; |
| | | private final DN baseDN; |
| | | private final int serverId; |
| | | private Semaphore sendWindow; |
| | | private int maxSendWindow; |
| | |
| | | * @param replicationDomain The replication domain that is creating us. |
| | | * @param state The ServerState that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param baseDn The base DN that should be used by this broker |
| | | * @param baseDN The base DN that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param serverID2 The server ID that should be used by this broker |
| | | * @param serverId The server ID that should be used by this broker |
| | | * when negotiating the session with the replicationServer. |
| | | * @param window The size of the send and receive window to use. |
| | | * @param generationId The generationId for the server associated to the |
| | |
| | | * or zero if no CSN heartbeat should be sent. |
| | | */ |
| | | public ReplicationBroker(ReplicationDomain replicationDomain, |
| | | ServerState state, String baseDn, int serverID2, int window, |
| | | ServerState state, DN baseDN, int serverId, int window, |
| | | long generationId, long heartbeatInterval, |
| | | ReplSessionSecurity replSessionSecurity, byte groupId, |
| | | long changeTimeHeartbeatInterval) |
| | | { |
| | | this.domain = replicationDomain; |
| | | this.baseDn = baseDn; |
| | | this.serverId = serverID2; |
| | | this.baseDN = baseDN; |
| | | this.serverId = serverId; |
| | | this.state = state; |
| | | this.protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | this.replSessionSecurity = replSessionSecurity; |
| | |
| | | { |
| | | shutdown = false; |
| | | this.rcvWindow = this.maxRcvWindow; |
| | | this.connect(); |
| | | connect(); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | this.rcvWindow = this.maxRcvWindow; |
| | | this.connect(); |
| | | connect(); |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | private void connect() |
| | | { |
| | | if (this.baseDn.compareToIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0) |
| | | if (this.baseDN.toNormalizedString().equalsIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) |
| | | { |
| | | connectAsECL(); |
| | | } else |
| | |
| | | || (electedRsInfo.getGenerationId() == -1)) |
| | | { |
| | | Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG |
| | | .get(serverId, rsServerId, baseDn, |
| | | .get(serverId, rsServerId, baseDN.toNormalizedString(), |
| | | session.getReadableRemoteAddress(), |
| | | getGenerationID()); |
| | | logError(message); |
| | | } else |
| | | { |
| | | Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG |
| | | .get(serverId, rsServerId, baseDn, |
| | | .get(serverId, rsServerId, baseDN.toNormalizedString(), |
| | | session.getReadableRemoteAddress(), |
| | | getGenerationID(), |
| | | electedRsInfo.getGenerationId()); |
| | |
| | | { |
| | | Message message = WARN_COULD_NOT_FIND_CHANGELOG.get( |
| | | serverId, |
| | | baseDn, |
| | | collectionToString(replicationServerInfos.keySet(), |
| | | ", ")); |
| | | baseDN.toNormalizedString(), |
| | | collectionToString(replicationServerInfos.keySet(), ", ")); |
| | | logError(message); |
| | | } |
| | | else |
| | | { |
| | | Message message = WARN_NO_AVAILABLE_CHANGELOGS.get( |
| | | serverId, baseDn); |
| | | serverId, baseDN.toNormalizedString()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | warn user and start heartbeat monitor to recover when a server |
| | | with the right group id shows up. |
| | | */ |
| | | Message message = |
| | | WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(Byte |
| | | .toString(groupId), Integer.toString(rsServerId), rsInfo |
| | | .getServerURL(), Byte.toString(getRsGroupId()), baseDn, Integer |
| | | .toString(serverId)); |
| | | Message message = WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get( |
| | | Byte.toString(groupId), Integer.toString(rsServerId), |
| | | rsInfo.getServerURL(), Byte.toString(getRsGroupId()), |
| | | baseDN.toNormalizedString(), Integer.toString(serverId)); |
| | | logError(message); |
| | | } |
| | | startRSHeartBeatMonitoring(); |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | Message message = |
| | | ERR_COMPUTING_FAKE_OPS.get(baseDn, rsInfo.getServerURL(), e |
| | | .getLocalizedMessage() |
| | | + stackTraceToSingleLineString(e)); |
| | | Message message = ERR_COMPUTING_FAKE_OPS.get( |
| | | baseDN.toNormalizedString(), rsInfo.getServerURL(), |
| | | e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } |
| | | finally |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("RB for dn " + baseDn + " and with server id " |
| | | TRACER.debugInfo("RB for dn " + baseDN + " and with server id " |
| | | + serverId + " computed " + nChanges + " changes late."); |
| | | } |
| | | |
| | |
| | | String port = server.substring(separator + 1); |
| | | String hostname = server.substring(0, separator); |
| | | |
| | | final String baseDn = this.baseDN.toNormalizedString(); |
| | | |
| | | Session localSession = null; |
| | | Socket socket = null; |
| | | boolean hasConnected = false; |
| | |
| | | |
| | | try |
| | | { |
| | | /* |
| | | * Open a socket connection to the next candidate. |
| | | */ |
| | | // Open a socket connection to the next candidate. |
| | | int intPort = Integer.parseInt(port); |
| | | InetSocketAddress serverAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), intPort); |
| | |
| | | StartMsg serverStartMsg; |
| | | if (!isECL) |
| | | { |
| | | serverStartMsg = new ServerStartMsg(serverId, url, baseDn, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | this.getGenerationID(), isSslEncryption, groupId); |
| | | serverStartMsg = new ServerStartMsg(serverId, url, |
| | | baseDN.toNormalizedString(), maxRcvWindow, heartbeatInterval, state, |
| | | getGenerationID(), isSslEncryption, groupId); |
| | | } |
| | | else |
| | | { |
| | | serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0, |
| | | maxRcvWindow, heartbeatInterval, state, |
| | | this.getGenerationID(), isSslEncryption, groupId); |
| | | getGenerationID(), isSslEncryption, groupId); |
| | | } |
| | | localSession.publish(serverStartMsg); |
| | | |
| | |
| | | ReplicationMsg msg = localSession.receive(); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | + serverStartMsg + "\nAND RECEIVED:\n" + msg); |
| | | } |
| | | |
| | |
| | | |
| | | // Sanity check |
| | | String repDn = replServerInfo.getBaseDn(); |
| | | if (!this.baseDn.equals(repDn)) |
| | | if (!baseDn.equals(repDn)) |
| | | { |
| | | errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn, |
| | | this.baseDn); |
| | | errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn, baseDn); |
| | | return null; |
| | | } |
| | | |
| | |
| | | { |
| | | if (!hasConnected || !keepConnection) |
| | | { |
| | | if (localSession != null) |
| | | { |
| | | localSession.close(); |
| | | } |
| | | |
| | | if (socket != null) |
| | | { |
| | | try |
| | | { |
| | | socket.close(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | // Ignore. |
| | | } |
| | | } |
| | | close(localSession); |
| | | close(socket); |
| | | } |
| | | |
| | | if (!hasConnected && errorMessage != null) |
| | |
| | | * reply message from the replication server. |
| | | * |
| | | * @param server Server we are connecting with. |
| | | * @return The ReplServerStartMsg the server replied. Null if could not |
| | | * get an answer. |
| | | */ |
| | | private TopologyMsg performECLPhaseTwoHandshake(String server) |
| | | private void performECLPhaseTwoHandshake(String server) |
| | | { |
| | | TopologyMsg topologyMsg = null; |
| | | |
| | | try |
| | | { |
| | | // Send our Start Session |
| | |
| | | startECLSessionMsg.setOperationId("-1"); |
| | | session.publish(startECLSessionMsg); |
| | | |
| | | /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ? |
| | | * Read the TopologyMsg that should come back. |
| | | topologyMsg = (TopologyMsg) session.receive(); |
| | | */ |
| | | // FIXME ECL In the handshake phase two, should RS send back a topo msg ? |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | + startECLSessionMsg); |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | | session.setSoTimeout(timeout); |
| | | connected = true; |
| | | |
| | | } catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDn, stackTraceToSingleLineString(e)); |
| | | server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | setSession(null); |
| | | |
| | | // Be sure to return null. |
| | | topologyMsg = null; |
| | | } |
| | | return topologyMsg; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" |
| | | TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n" |
| | | + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg); |
| | | } |
| | | |
| | |
| | | } catch (Exception e) |
| | | { |
| | | Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId, |
| | | server, baseDn, stackTraceToSingleLineString(e)); |
| | | server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | |
| | | setSession(null); |
| | |
| | | // Start a heartbeat monitor thread. |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatMonitor = new HeartbeatMonitor(getServerId(), |
| | | getRsServerId(), baseDn, session, heartbeatInterval); |
| | | heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(), |
| | | baseDN.toNormalizedString(), session, heartbeatInterval); |
| | | heartbeatMonitor.start(); |
| | | } |
| | | } |
| | |
| | | catch (Exception e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(baseDn, |
| | | e.getLocalizedMessage())); |
| | | mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get( |
| | | baseDN.toNormalizedString(), e.getLocalizedMessage())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | } |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo(this + " end restart : connected=" + connected |
| | | + " with RSid=" + this.getRsServerId() + " genid=" + this.generationID); |
| | | + " with RSid=" + getRsServerId() + " genid=" + this.generationID); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | else if (msg instanceof StopMsg) |
| | | { |
| | | /* |
| | | * RS performs a proper disconnection |
| | | */ |
| | | Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED |
| | | .get(replicationServerID, |
| | | savedSession.getReadableRemoteAddress(), |
| | | serverId, baseDn); |
| | | // RS performs a proper disconnection |
| | | Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get( |
| | | replicationServerID, savedSession.getReadableRemoteAddress(), |
| | | serverId, baseDN.toNormalizedString()); |
| | | logError(message); |
| | | |
| | | // Try to find a suitable RS |
| | | this.reStart(savedSession, true); |
| | | reStart(savedSession, true); |
| | | } |
| | | else if (msg instanceof MonitorMsg) |
| | | { |
| | |
| | | message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get( |
| | | serverId, replicationServerID, |
| | | savedSession.getReadableRemoteAddress(), |
| | | baseDn); |
| | | baseDN.toNormalizedString()); |
| | | } |
| | | else |
| | | { |
| | | message = NOTE_NEW_BEST_REPLICATION_SERVER.get( |
| | | serverId, replicationServerID, |
| | | savedSession.getReadableRemoteAddress(), |
| | | bestServerInfo.getServerId(), baseDn); |
| | | bestServerInfo.getServerId(), |
| | | baseDN.toNormalizedString()); |
| | | } |
| | | logError(message); |
| | | reStart(true); |
| | |
| | | final Session tmpSession = session; |
| | | if (tmpSession == null || !tmpSession.closeInitiated()) |
| | | { |
| | | /* |
| | | * We did not initiate the close on our side, log an error message. |
| | | */ |
| | | Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED |
| | | .get(serverId, baseDn, replicationServerID, |
| | | // We did not initiate the close on our side, log an error message. |
| | | Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get( |
| | | serverId, baseDN.toNormalizedString(), replicationServerID, |
| | | savedSession.getReadableRemoteAddress()); |
| | | logError(message); |
| | | } |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("ReplicationBroker " + serverId + " is stopping and will" |
| | | + " close the connection to replication server " + rsServerId + " for" |
| | | + " domain " + baseDn); |
| | | + " domain " + baseDN); |
| | | |
| | | synchronized (startStopLock) |
| | | { |
| | |
| | | if (connected) |
| | | { |
| | | return sendWindow.availablePermits(); |
| | | } else |
| | | { |
| | | return 0; |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | |
| | | } catch (IOException ex) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_CS.get( |
| | | baseDn, |
| | | baseDN.toNormalizedString(), |
| | | Integer.toString(serverId), |
| | | ex.getLocalizedMessage() + stackTraceToSingleLineString(ex)); |
| | | ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex)); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | // Start a CSN heartbeat thread. |
| | | if (changeTimeHeartbeatSendInterval > 0) |
| | | { |
| | | String threadName = "Replica DS(" |
| | | + this.getServerId() |
| | | String threadName = "Replica DS(" + getServerId() |
| | | + ") change time heartbeat publisher for domain \"" |
| | | + this.baseDn + "\" to RS(" + this.getRsServerId() |
| | | + this.baseDN + "\" to RS(" + getRsServerId() |
| | | + ") at " + session.getReadableRemoteAddress(); |
| | | |
| | | ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( |
| | |
| | | import org.opends.server.tasks.InitializeTargetTask; |
| | | import org.opends.server.tasks.InitializeTask; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | |
| | | * and which can start receiving updates. |
| | | * <p> |
| | | * When updates are received the Replication Service calls the |
| | | * {@link #processUpdate(UpdateMsg)} method. |
| | | * {@link #processUpdate(UpdateMsg, AtomicBoolean)} method. |
| | | * ReplicationDomain implementation should implement the appropriate code |
| | | * for replaying the update on the local repository. |
| | | * When fully done the subclass must call the |
| | |
| | | * All Replication Domain using this baseDN will be connected |
| | | * through the Replication Service. |
| | | */ |
| | | private final String baseDN; |
| | | private final DN baseDN; |
| | | |
| | | /** |
| | | * The identifier of this Replication Domain inside the |
| | |
| | | /** |
| | | * A Map used to store all the ReplicationDomains created on this server. |
| | | */ |
| | | private static Map<String, ReplicationDomain> domains = |
| | | new HashMap<String, ReplicationDomain>(); |
| | | private static Map<DN, ReplicationDomain> domains = |
| | | new HashMap<DN, ReplicationDomain>(); |
| | | |
| | | /* |
| | | * Assured mode properties |
| | |
| | | * is participating to a given Replication Domain. |
| | | * @param initWindow Window used during initialization. |
| | | */ |
| | | public ReplicationDomain(String baseDN, int serverID,int initWindow) |
| | | public ReplicationDomain(DN baseDN, int serverID, int initWindow) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.serverID = serverID; |
| | |
| | | * is participating to a given Replication Domain. |
| | | * @param serverState The serverState to use |
| | | */ |
| | | public ReplicationDomain(String baseDN, int serverID, |
| | | ServerState serverState) |
| | | public ReplicationDomain(DN baseDN, int serverID, ServerState serverState) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.serverID = serverID; |
| | |
| | | if (!isValidInitialStatus(initStatus)) |
| | | { |
| | | Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(), |
| | | baseDN, Integer.toString(serverID)); |
| | | getBaseDNString(), Integer.toString(serverID)); |
| | | logError(msg); |
| | | } else |
| | | { |
| | |
| | | if (event == StatusMachineEvent.INVALID_EVENT) |
| | | { |
| | | Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(), |
| | | baseDN, Integer.toString(serverID)); |
| | | getBaseDNString(), Integer.toString(serverID)); |
| | | logError(msg); |
| | | return; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the base DN of this ReplicationDomain. |
| | | * |
| | | * @return The base DN of this ReplicationDomain |
| | | */ |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDN; |
| | | } |
| | | |
| | | /** |
| | | * Gets the baseDN of this domain. |
| | | * |
| | | * @return The baseDN for this domain. |
| | | */ |
| | | public String getBaseDNString() |
| | | { |
| | | return baseDN; |
| | | return baseDN.toNormalizedString(); |
| | | } |
| | | |
| | | /** |
| | |
| | | requested servers. Log problem |
| | | */ |
| | | Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get( |
| | | baseDN, Integer.toString(serverID), |
| | | getBaseDNString(), Integer.toString(serverID), |
| | | update.toString(), ack.errorsToString()); |
| | | logError(errorMsg); |
| | | |
| | |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get( |
| | | countEntries(), baseDN, serverID); |
| | | countEntries(), getBaseDNString(), serverID); |
| | | logError(msg); |
| | | |
| | | for (DSInfo dsi : getReplicasList()) |
| | |
| | | else |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get( |
| | | countEntries(), baseDN, serverID, serverToInitialize); |
| | | countEntries(), getBaseDNString(), serverID, serverToInitialize); |
| | | logError(msg); |
| | | |
| | | ieContext.startList.add(serverToInitialize); |
| | |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMsg initTargetMsg = new InitializeTargetMsg( |
| | | baseDN, serverID, serverToInitialize, serverRunningTheTask, |
| | | ieContext.entryCount, initWindow); |
| | | getBaseDNString(), serverID, serverToInitialize, |
| | | serverRunningTheTask, ieContext.entryCount, initWindow); |
| | | |
| | | broker.publish(initTargetMsg); |
| | | |
| | |
| | | // Notify the peer of the success |
| | | DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination()); |
| | | broker.publish(doneMsg); |
| | | |
| | | } |
| | | catch(DirectoryException exportException) |
| | | { |
| | |
| | | .getLocalizedMessage() : ""; |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL |
| | | .get(baseDN, serverID, cause); |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get( |
| | | getBaseDNString(), serverID, cause); |
| | | logError(msg); |
| | | } |
| | | else |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get( |
| | | baseDN, serverID, serverToInitialize, cause); |
| | | getBaseDNString(), serverID, serverToInitialize, cause); |
| | | logError(msg); |
| | | } |
| | | |
| | |
| | | Message errMsg = |
| | | Message.raw(Category.SYNC, Severity.NOTICE, |
| | | ERR_INIT_EXPORTER_DISCONNECTION.get( |
| | | this.baseDN, |
| | | getBaseDNString(), |
| | | Integer.toString(this.serverID), |
| | | Integer.toString(ieContext.importSource))); |
| | | if (ieContext.getException()==null) |
| | |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.attemptCnt = 0; |
| | | ieContext.initReqMsgSent = new InitializeRequestMsg( |
| | | baseDN, serverID, source, this.initWindow); |
| | | getBaseDNString(), serverID, source, this.initWindow); |
| | | |
| | | // Publish Init request msg |
| | | broker.publish(ieContext.initReqMsgSent); |
| | |
| | | { |
| | | // Log starting |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get( |
| | | baseDN, initTargetMsgReceived.getSenderID(), serverID); |
| | | getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID); |
| | | logError(msg); |
| | | |
| | | // Go into full update status |
| | |
| | | finally |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get( |
| | | baseDN, initTargetMsgReceived.getSenderID(), serverID, |
| | | getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID, |
| | | (ieContext.getException() != null ? ieContext |
| | | .getException().getLocalizedMessage() : "")); |
| | | logError(msg); |
| | |
| | | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDN, |
| | | Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(getBaseDNString(), |
| | | Integer.toString(serverID), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return; |
| | |
| | | } |
| | | if (!allSet) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_ID_FAILED.get(baseDN); |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | Message message = ERR_RESET_GENERATION_ID_FAILED.get(getBaseDNString()); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | } |
| | | |
| | |
| | | if (!isConnected()) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(baseDN, |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(), |
| | | Integer.toString(serverID), |
| | | Long.toString(genIdMessage.getGenerationId())); |
| | | throw new DirectoryException( |
| | |
| | | * Starts the receiver side of the Replication Service. |
| | | * <p> |
| | | * After this method has been called, the Replication Service will start |
| | | * calling the {@link #processUpdate(UpdateMsg)}. |
| | | * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}. |
| | | * <p> |
| | | * This method must be called once and must be called after the |
| | | * {@link #startPublishService(Collection, int, long, long)}. |
| | | * |
| | | */ |
| | | public void startListenService() |
| | | { |
| | |
| | | } else if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get( |
| | | Integer.toString(serverID), msgAssuredMode.toString(), baseDN, |
| | | msg.toString()); |
| | | Integer.toString(serverID), msgAssuredMode.toString(), |
| | | getBaseDNString(), msg.toString()); |
| | | logError(errorMsg); |
| | | } |
| | | // Nothing to do in Assured safe data mode, only RS ack updates. |
| | |
| | | * Publish an {@link UpdateMsg} to the Replication Service. |
| | | * <p> |
| | | * The Replication Service will handle the delivery of this {@link UpdateMsg} |
| | | * to all the participants of this Replication Domain. |
| | | * These members will be receive this {@link UpdateMsg} through a call |
| | | * of the {@link #processUpdate(UpdateMsg)} message. |
| | | * to all the participants of this Replication Domain. These members will be |
| | | * receive this {@link UpdateMsg} through a call of the |
| | | * {@link #processUpdate(UpdateMsg, AtomicBoolean)} message. |
| | | * |
| | | * @param msg The UpdateMsg that should be pushed. |
| | | */ |
| | |
| | | { |
| | | // This exception may only be raised if assured replication is |
| | | // enabled |
| | | Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(baseDN, Long.toString( |
| | | assuredTimeout), update.toString()); |
| | | Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(), |
| | | Long.toString(assuredTimeout), update.toString()); |
| | | logError(errorMsg); |
| | | } |
| | | } |
| | |
| | | package org.opends.server.replication; |
| | | |
| | | import java.io.File; |
| | | import java.net.SocketException; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.*; |
| | | |
| | |
| | | private static final int changelog2ID = 12; |
| | | private static final int changelog3ID = 13; |
| | | |
| | | private DN baseDn; |
| | | private DN baseDN; |
| | | private ReplicationBroker broker2 = null; |
| | | private ReplicationBroker broker3 = null; |
| | | private ReplicationServer replServer1 = null; |
| | |
| | | { |
| | | super.setUp(); |
| | | |
| | | baseDn = DN.decode(baseDnStr); |
| | | baseDN = DN.decode(baseDnStr); |
| | | |
| | | updatedEntries = newLDIFEntries(); |
| | | |
| | |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-initialize-remote-replica", |
| | | "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask", |
| | | "ds-task-initialize-domain-dn: " + baseDn, |
| | | "ds-task-initialize-domain-dn: " + baseDN, |
| | | "ds-task-initialize-replica-server-id: " + server2ID); |
| | | } |
| | | |
| | |
| | | { |
| | | |
| | | return new String[]{ |
| | | "dn: " + baseDn + "\n" |
| | | "dn: " + baseDN + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: organization\n" |
| | | + "entryUUID: 21111111-1111-1111-1111-111111111111\n" |
| | | + "\n", |
| | | "dn: ou=People," + baseDn + "\n" |
| | | "dn: ou=People," + baseDN + "\n" |
| | | + "objectClass: top\n" |
| | | + "objectClass: organizationalUnit\n" |
| | | + "entryUUID: 21111111-1111-1111-1111-111111111112\n" |
| | | + "\n", |
| | | "dn: cn=Fiona Jensen,ou=people," + baseDn + "\n" |
| | | "dn: cn=Fiona Jensen,ou=people," + baseDN + "\n" |
| | | + "objectclass: top\n" |
| | | + "objectclass: person\n" |
| | | + "objectclass: organizationalPerson\n" |
| | |
| | | + "telephonenumber: +1 408 555 1212\n" |
| | | + "entryUUID: 21111111-1111-1111-1111-111111111113\n" |
| | | + "\n", |
| | | "dn: cn=Robert Langman,ou=people," + baseDn + "\n" |
| | | "dn: cn=Robert Langman,ou=people," + baseDN + "\n" |
| | | + "objectclass: top\n" |
| | | + "objectclass: person\n" |
| | | + "objectclass: organizationalPerson\n" |
| | |
| | | * @return The new created replication server. |
| | | */ |
| | | private ReplicationServer createReplicationServer(int changelogId, |
| | | boolean all, String testCase) |
| | | boolean all, String testCase) throws Exception |
| | | { |
| | | SortedSet<String> servers = new TreeSet<String>(); |
| | | try |
| | | { |
| | | if (all) |
| | | { |
| | | if (changelogId != changelog1ID) |
| | |
| | | int chPort = getChangelogPort(changelogId); |
| | | String chDir = "generationIdTest"+changelogId+testCase+"Db"; |
| | | ReplServerFakeConfiguration conf = |
| | | new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100, |
| | | servers); |
| | | new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100, servers); |
| | | ReplicationServer replicationServer = new ReplicationServer(conf); |
| | | Thread.sleep(1000); |
| | | |
| | | return replicationServer; |
| | | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | fail("createChangelog" + stackTraceToSingleLineString(e)); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | |
| | | * replication Server ID. |
| | | * @param changeLogID replication Server ID |
| | | */ |
| | | private void connectServer1ToChangelog(int changeLogID) |
| | | private void connectServer1ToChangelog(int changeLogID) throws Exception |
| | | { |
| | | // Connect DS to the replicationServer |
| | | try |
| | | { |
| | | // suffix synchronized |
| | | String synchroServerLdif = |
| | |
| | | LDAPReplicationDomain doToco=null; |
| | | while(waitCo<50) |
| | | { |
| | | doToco = |
| | | LDAPReplicationDomain.retrievesReplicationDomain(baseDn); |
| | | doToco = LDAPReplicationDomain.retrievesReplicationDomain(baseDN); |
| | | if (doToco != null && doToco.isConnected()) |
| | | { |
| | | break; |
| | |
| | | assertTrue(doToco.isConnected(), "not connected after #attempt="+waitCo); |
| | | debugInfo("ReplicationDomain: Import/Export is running ? " + doToco.ieRunning()); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | debugInfo("connectToReplServer", e); |
| | | fail("connectToReplServer", e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Disconnect DS from the replicationServer |
| | | */ |
| | | private void disconnectFromReplServer(int changelogID) |
| | | private void disconnectFromReplServer(int changelogID) throws Exception |
| | | { |
| | | try |
| | | { |
| | | // suffix synchronized |
| | | String synchroServerStringDN = "cn=" + testName + ", cn=domains," + |
| | |
| | | |
| | | DN synchroServerDN = DN.decode(synchroServerStringDN); |
| | | |
| | | Entry ecle; |
| | | ecle = DirectoryServer.getConfigHandler().getEntry( |
| | | Entry ecle = DirectoryServer.getConfigHandler().getEntry( |
| | | DN.decode("cn=external changelog," + synchroServerStringDN)); |
| | | if (ecle!=null) |
| | | { |
| | |
| | | int waitCo=0; |
| | | while(waitCo<30) |
| | | { |
| | | replDomainToDis = |
| | | LDAPReplicationDomain.retrievesReplicationDomain(baseDn); |
| | | replDomainToDis = LDAPReplicationDomain.retrievesReplicationDomain(baseDN); |
| | | Thread.sleep(200); |
| | | waitCo++; |
| | | } |
| | |
| | | debugInfo("disconnectFromReplServer:" + changelogID, e); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail("disconnectFromReplServer", e); |
| | | } |
| | | } |
| | | |
| | | private int getChangelogPort(int changelogID) throws Exception |
| | |
| | | protected static final String REPLICATION_GENERATION_ID = |
| | | "ds-sync-generation-id"; |
| | | |
| | | private long readGenIdFromSuffixRootEntry() |
| | | private long readGenIdFromSuffixRootEntry() throws Exception |
| | | { |
| | | long genId=-1; |
| | | try |
| | | { |
| | | Entry resultEntry = getEntry(baseDn, 1000, true); |
| | | Entry resultEntry = getEntry(baseDN, 1000, true); |
| | | if (resultEntry==null) |
| | | { |
| | | debugInfo("Entry not found <" + baseDn + ">"); |
| | | debugInfo("Entry not found <" + baseDN + ">"); |
| | | } |
| | | else |
| | | { |
| | | debugInfo("Entry found <" + baseDn + ">"); |
| | | debugInfo("Entry found <" + baseDN + ">"); |
| | | |
| | | AttributeType synchronizationGenIDType = |
| | | DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID); |
| | |
| | | |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail("Exception raised in readGenId", e); |
| | | } |
| | | return genId; |
| | | } |
| | | |
| | | private void performLdifImport() |
| | | { |
| | | try |
| | | private void performLdifImport() throws Exception |
| | | { |
| | | // Create a temporary test LDIF file. |
| | | /* |
| | | A temporary LDIF file containing some test entries. |
| | | */ |
| | | // A temporary LDIF file containing some test entries. |
| | | File ldifFile = File.createTempFile("import-test", ".ldif"); |
| | | String resourcePath = DirectoryServer.getInstanceRoot() + File.separator + |
| | | "config" + File.separator + "MakeLDIF"; |
| | | String resourcePath = |
| | | DirectoryServer.getInstanceRoot() + File.separator + "config" |
| | | + File.separator + "MakeLDIF"; |
| | | LdifFileWriter.makeLdif(ldifFile.getPath(), resourcePath, template); |
| | | |
| | | // Launch import of the Ldif file on the memory test backend |
| | | // Note: we do not use a task here as import task does not work on memory |
| | | // backend: it disables then re-enables backend which leads to backend |
| | | // object instance lost and this is not accepttable for a backend with |
| | | // object instance lost and this is not acceptable for a backend with |
| | | // non persistent data |
| | | LDIFImportConfig importConfig = |
| | | new LDIFImportConfig(ldifFile.getAbsolutePath()); |
| | | LDIFImportConfig importConfig = new LDIFImportConfig(ldifFile.getAbsolutePath()); |
| | | |
| | | MemoryBackend memoryBackend = |
| | | (MemoryBackend) DirectoryServer.getBackend(TEST_BACKEND_ID); |
| | | MemoryBackend memoryBackend = (MemoryBackend) DirectoryServer.getBackend(TEST_BACKEND_ID); |
| | | memoryBackend.importLDIF(importConfig); |
| | | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail("Could not perform ldif import on memory test backend: " |
| | | + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | private String createEntry(UUID uid) |
| | |
| | | + "userPassword: password\n" + "initials: AA\n"; |
| | | } |
| | | |
| | | static protected ReplicationMsg createAddMsg() |
| | | static protected ReplicationMsg createAddMsg() throws Exception |
| | | { |
| | | Entry personWithUUIDEntry = null; |
| | | String user1entryUUID; |
| | |
| | | + "userPassword: password\n" + "initials: AA\n" |
| | | + "entryUUID: " + user1entryUUID + "\n"; |
| | | |
| | | try |
| | | { |
| | | personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail(e.getMessage()); |
| | | } |
| | | |
| | | // Create and publish an update message to add an entry. |
| | | return new AddMsg(gen.newCSN(), |
| | |
| | | |
| | | debugInfo(testCase + " Expect genId to be set in memory on the replication " + |
| | | " server side (not wrote on disk/db since no change occurred)."); |
| | | rgenId = replServer1.getGenerationId(baseDn.toNormalizedString()); |
| | | rgenId = replServer1.getGenerationId(baseDN); |
| | | assertEquals(rgenId, EMPTY_DN_GENID); |
| | | |
| | | // Clean for next test |
| | |
| | | assertTrue(genId != EMPTY_DN_GENID); |
| | | |
| | | debugInfo(testCase + " Test that the generationId is set on RS1"); |
| | | rgenId = replServer1.getGenerationId(baseDn.toNormalizedString()); |
| | | rgenId = replServer1.getGenerationId(baseDN); |
| | | assertEquals(genId, rgenId); |
| | | |
| | | //=========================================================== |
| | | debugInfo(testCase + " ** TEST ** DS2 connection to RS1 with bad genID"); |
| | | |
| | | try |
| | | { |
| | | broker2 = openReplicationSession(baseDn, |
| | | server2ID, 100, getChangelogPort(changelog1ID), |
| | | 1000, !emptyOldChanges, genId+1); |
| | | } |
| | | catch(SocketException se) |
| | | { |
| | | fail("DS2 with bad genID failed to connect to RS1."); |
| | | } |
| | | broker2 = openReplicationSession(baseDN, server2ID, 100, |
| | | getChangelogPort(changelog1ID), 1000, !emptyOldChanges, genId+1); |
| | | |
| | | //=========================================================== |
| | | debugInfo(testCase + " ** TEST ** DS3 connection to RS1 with good genID"); |
| | | try |
| | | { |
| | | broker3 = openReplicationSession(baseDn, |
| | | server3ID, 100, getChangelogPort(changelog1ID), 1000, !emptyOldChanges, genId); |
| | | } |
| | | catch(SocketException se) |
| | | { |
| | | fail("Broker connection is expected to be accepted."); |
| | | } |
| | | broker3 = openReplicationSession(baseDN, server3ID, 100, |
| | | getChangelogPort(changelog1ID), 1000, !emptyOldChanges, genId); |
| | | |
| | | //=========================================================== |
| | | debugInfo(testCase + " ** TEST ** DS2 (bad genID) changes must be ignored."); |
| | |
| | | checkChangelogSize(1); |
| | | |
| | | // Verify that DS3 receives this change |
| | | try |
| | | { |
| | | ReplicationMsg msg = broker3.receive(); |
| | | debugInfo("Broker 3 received expected update msg" + msg); |
| | | } |
| | | catch(SocketTimeoutException e) |
| | | { |
| | | fail("Update message is supposed to be received."); |
| | | } |
| | | |
| | | //=========================================================== |
| | | debugInfo(testCase + " ** TEST ** Persistence of the generation ID in RS1"); |
| | | |
| | | long genIdBeforeShut = |
| | | replServer1.getGenerationId(baseDn.toNormalizedString()); |
| | | long genIdBeforeShut = replServer1.getGenerationId(baseDN); |
| | | |
| | | debugInfo("Shutdown replServer1"); |
| | | broker2.stop(); |
| | |
| | | |
| | | debugInfo("Delay to allow DS to reconnect to replServer1"); |
| | | |
| | | long genIdAfterRestart = |
| | | replServer1.getGenerationId(baseDn.toNormalizedString()); |
| | | long genIdAfterRestart = replServer1.getGenerationId(baseDN); |
| | | debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart); |
| | | assertTrue(replServer1!=null, "Replication server creation failed."); |
| | | assertTrue(genIdBeforeShut == genIdAfterRestart, |
| | |
| | | //=============================================================== |
| | | debugInfo(testCase + " ** TEST ** Import with new data set + reset will"+ |
| | | " spread a new gen ID on the topology, verify DS1 and RS1"); |
| | | try |
| | | { |
| | | debugInfo("Create again broker2"); |
| | | broker2 = openReplicationSession(baseDn, |
| | | broker2 = openReplicationSession(baseDN, |
| | | server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId); |
| | | assertTrue(broker2.isConnected(), "Broker2 failed to connect to replication server"); |
| | | |
| | | debugInfo("Create again broker3"); |
| | | broker3 = openReplicationSession(baseDn, |
| | | broker3 = openReplicationSession(baseDN, |
| | | server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId); |
| | | assertTrue(broker3.isConnected(), "Broker3 failed to connect to replication server"); |
| | | } |
| | | catch(SocketException se) |
| | | { |
| | | fail("Broker connection is expected to be accepted."); |
| | | } |
| | | |
| | | |
| | | debugInfo("Launch on-line import on DS1"); |
| | |
| | | + "is expected to be diffrent from previous one"); |
| | | |
| | | debugInfo("RS1 must have the new gen ID"); |
| | | rgenId = replServer1.getGenerationId(baseDn.toNormalizedString()); |
| | | rgenId = replServer1.getGenerationId(baseDN); |
| | | assertEquals(genId, rgenId, "DS and replServer are expected to have same genId."); |
| | | |
| | | debugInfo("RS1 must have been cleared since it has not the proper generation ID"); |
| | |
| | | |
| | | try |
| | | { |
| | | ReplicationMsg msg = broker2.receive(); |
| | | fail("No update message is supposed to be received by broker2 in bad gen id. " + msg); |
| | | ReplicationMsg msg2 = broker2.receive(); |
| | | fail("No update message is supposed to be received by broker2 in bad gen id. " + msg2); |
| | | } catch(SocketTimeoutException e) { /* expected */ } |
| | | |
| | | try |
| | | { |
| | | ReplicationMsg msg = broker3.receive(); |
| | | fail("No update message is supposed to be received by broker3 in bad gen id. " + msg); |
| | | ReplicationMsg msg2 = broker3.receive(); |
| | | fail("No update message is supposed to be received by broker3 in bad gen id. " + msg2); |
| | | } catch(SocketTimeoutException e) { /* expected */ } |
| | | |
| | | |
| | |
| | | |
| | | try |
| | | { |
| | | ReplicationMsg msg = broker3.receive(); |
| | | fail("No update message is supposed to be received by broker3 in bad gen id. "+ msg); |
| | | ReplicationMsg msg2 = broker3.receive(); |
| | | fail("No update message is supposed to be received by broker3 in bad gen id. "+ msg2); |
| | | } catch(SocketTimeoutException e) { /* expected */ } |
| | | |
| | | |
| | |
| | | broker2.stop(); |
| | | |
| | | // Simulates the broker restart at the end of the import |
| | | broker2 = openReplicationSession(baseDn, |
| | | broker2 = openReplicationSession(baseDN, |
| | | server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId); |
| | | |
| | | broker3.stop(); |
| | | |
| | | // Simulates the broker restart at the end of the import |
| | | broker3 = openReplicationSession(baseDn, |
| | | broker3 = openReplicationSession(baseDN, |
| | | server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges, genId); |
| | | |
| | | debugInfo("Adding reset task to DS1"); |
| | |
| | | waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null); |
| | | |
| | | debugInfo("Verify that RS1 has still the right genID"); |
| | | assertEquals(replServer1.getGenerationId(baseDn.toNormalizedString()), rgenId); |
| | | assertEquals(replServer1.getGenerationId(baseDN), rgenId); |
| | | |
| | | // Updates count in RS1 must stay unchanged = to 1 |
| | | Thread.sleep(500); |
| | |
| | | "Expecting that DS3 is not in bad gen id from RS1"); |
| | | |
| | | debugInfo("Verify that DS2 receives the add message stored in RS1 DB"); |
| | | try |
| | | { |
| | | ReplicationMsg msg = broker2.receive(); |
| | | assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg); |
| | | } |
| | | catch(SocketTimeoutException e) |
| | | { |
| | | fail("The msg stored in RS1 DB is expected to be received by DS2)"); |
| | | } |
| | | msg = broker2.receive(); |
| | | assertTrue(msg instanceof AddMsg, "Expected to receive an AddMsg but received: " + msg); |
| | | |
| | | debugInfo("Verify that DS3 receives the add message stored in RS1 DB"); |
| | | try |
| | | { |
| | | ReplicationMsg msg = broker3.receive(); |
| | | assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg); |
| | | } |
| | | catch(SocketTimeoutException e) |
| | | { |
| | | fail("The msg stored in RS1 DB is expected to be received by DS3)"); |
| | | } |
| | | msg = broker3.receive(); |
| | | assertTrue(msg instanceof AddMsg, "Expected to receive an AddMsg but received: " + msg); |
| | | |
| | | debugInfo("DS2 is publishing a change and RS1 must store this change, DS3 must receive it."); |
| | | emsg = (AddMsg)createAddMsg(); |
| | |
| | | Thread.sleep(500); |
| | | checkChangelogSize(2); |
| | | |
| | | try |
| | | { |
| | | ReplicationMsg msg = broker3.receive(); |
| | | |
| | | /* expected */ |
| | | msg = broker3.receive(); |
| | | AddMsg rcvmsg = (AddMsg)msg; |
| | | assertEquals(rcvmsg.getCSN(), emsg.getCSN()); |
| | | } |
| | | catch(SocketTimeoutException e) |
| | | { |
| | | fail("The msg send by DS2 is expected to be received by DS3)"); |
| | | } |
| | | |
| | | //=============================================================== |
| | | debugInfo(testCase + " ** TEST ** General cleaning"); |
| | |
| | | disconnectFromReplServer(changelog1ID); |
| | | |
| | | debugInfo("Successfully ending " + testCase); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | fail(testCase + " Exception:"+ e.getMessage() + " " + |
| | | stackTraceToSingleLineString(e)); |
| | | } finally |
| | | { |
| | | postTest(); |
| | |
| | | int waitRes=0; |
| | | while(waitRes<100) |
| | | { |
| | | if ((replServer1.getGenerationId(baseDn.toNormalizedString())==EMPTY_DN_GENID) |
| | | && (replServer2.getGenerationId(baseDn.toNormalizedString())==EMPTY_DN_GENID) |
| | | && (replServer3.getGenerationId(baseDn.toNormalizedString())==EMPTY_DN_GENID)) |
| | | if (replServer1.getGenerationId(baseDN) == EMPTY_DN_GENID |
| | | && replServer2.getGenerationId(baseDN) == EMPTY_DN_GENID |
| | | && replServer3.getGenerationId(baseDN) == EMPTY_DN_GENID) |
| | | break; |
| | | waitRes++; |
| | | Thread.sleep(100); |
| | | } |
| | | assertEquals(replServer1.getGenerationId(baseDn.toNormalizedString()), EMPTY_DN_GENID, |
| | | " in replServer1"); |
| | | assertEquals(replServer2.getGenerationId(baseDn.toNormalizedString()), EMPTY_DN_GENID, |
| | | " in replServer2"); |
| | | assertEquals(replServer3.getGenerationId(baseDn.toNormalizedString()), EMPTY_DN_GENID, |
| | | " in replServer3"); |
| | | assertEquals(replServer1.getGenerationId(baseDN), EMPTY_DN_GENID, " in replServer1"); |
| | | assertEquals(replServer2.getGenerationId(baseDN), EMPTY_DN_GENID, " in replServer2"); |
| | | assertEquals(replServer3.getGenerationId(baseDN), EMPTY_DN_GENID, " in replServer3"); |
| | | |
| | | debugInfo("Disconnect DS from replServer1."); |
| | | disconnectFromReplServer(changelog1ID); |
| | |
| | | waitRes=0; |
| | | while(waitRes<100) |
| | | { |
| | | if ((replServer1.getGenerationId(baseDn.toNormalizedString())==-1) |
| | | && (replServer2.getGenerationId(baseDn.toNormalizedString())==-1) |
| | | && (replServer3.getGenerationId(baseDn.toNormalizedString())==-1)) |
| | | if (replServer1.getGenerationId(baseDN) == -1 |
| | | && replServer2.getGenerationId(baseDN) == -1 |
| | | && replServer3.getGenerationId(baseDN) == -1) |
| | | break; |
| | | waitRes++; |
| | | Thread.sleep(100); |
| | | } |
| | | debugInfo( |
| | | "Expect genIds to be resetted in all servers to -1 as no more DS in topo - after 10 sec"); |
| | | assertEquals(replServer1.getGenerationId(baseDn.toNormalizedString()), -1); |
| | | assertEquals(replServer2.getGenerationId(baseDn.toNormalizedString()), -1); |
| | | assertEquals(replServer3.getGenerationId(baseDn.toNormalizedString()), -1); |
| | | assertEquals(replServer1.getGenerationId(baseDN), -1); |
| | | assertEquals(replServer2.getGenerationId(baseDN), -1); |
| | | assertEquals(replServer3.getGenerationId(baseDN), -1); |
| | | |
| | | debugInfo("Add entries to DS"); |
| | | addTestEntriesToDB(updatedEntries); |
| | |
| | | waitRes=0; |
| | | while(waitRes<100) |
| | | { |
| | | if ((replServer1.getGenerationId(baseDn.toNormalizedString())==genId) |
| | | && (replServer2.getGenerationId(baseDn.toNormalizedString())==genId) |
| | | && (replServer3.getGenerationId(baseDn.toNormalizedString())==genId)) |
| | | if (replServer1.getGenerationId(baseDN) == genId |
| | | && replServer2.getGenerationId(baseDN) == genId |
| | | && replServer3.getGenerationId(baseDN) == genId) |
| | | break; |
| | | waitRes++; |
| | | Thread.sleep(100); |
| | | } |
| | | assertEquals(replServer1.getGenerationId(baseDn.toNormalizedString()), genId); |
| | | assertEquals(replServer2.getGenerationId(baseDn.toNormalizedString()), genId); |
| | | assertEquals(replServer3.getGenerationId(baseDn.toNormalizedString()), genId); |
| | | assertEquals(replServer1.getGenerationId(baseDN), genId); |
| | | assertEquals(replServer2.getGenerationId(baseDN), genId); |
| | | assertEquals(replServer3.getGenerationId(baseDN), genId); |
| | | |
| | | debugInfo("Connecting broker2 to replServer3 with a good genId"); |
| | | try |
| | | { |
| | | broker2 = openReplicationSession(baseDn, |
| | | server2ID, 100, getChangelogPort(changelog3ID), |
| | | 1000, !emptyOldChanges, genId); |
| | | broker2 = openReplicationSession(baseDN, server2ID, 100, |
| | | getChangelogPort(changelog3ID), 1000, !emptyOldChanges, genId); |
| | | Thread.sleep(1000); |
| | | } catch (SocketException se) |
| | | { |
| | | fail("Broker connection is expected to be accepted."); |
| | | } |
| | | |
| | | debugInfo("Expecting that broker2 is not in bad gen id since it has a correct genId"); |
| | | assertFalse(isDegradedDueToGenerationId(replServer1, server2ID)); |
| | |
| | | waitRes=0; |
| | | while(waitRes<100) |
| | | { |
| | | if ((replServer1.getGenerationId(baseDn.toNormalizedString())==genId) |
| | | && (replServer2.getGenerationId(baseDn.toNormalizedString())==genId) |
| | | && (replServer3.getGenerationId(baseDn.toNormalizedString())==genId)) |
| | | if (replServer1.getGenerationId(baseDN) == genId |
| | | && replServer2.getGenerationId(baseDN) == genId |
| | | && replServer3.getGenerationId(baseDN) == genId) |
| | | break; |
| | | waitRes++; |
| | | Thread.sleep(100); |
| | | } |
| | | assertEquals(replServer1.getGenerationId(baseDn.toNormalizedString()), genId); |
| | | assertEquals(replServer2.getGenerationId(baseDn.toNormalizedString()), genId); |
| | | assertEquals(replServer3.getGenerationId(baseDn.toNormalizedString()), genId); |
| | | assertEquals(replServer1.getGenerationId(baseDN), genId); |
| | | assertEquals(replServer2.getGenerationId(baseDN), genId); |
| | | assertEquals(replServer3.getGenerationId(baseDN), genId); |
| | | |
| | | debugInfo("Connecting broker3 to replServer1 with a bad genId"); |
| | | try |
| | | { |
| | | long badgenId = 1; |
| | | broker3 = openReplicationSession(baseDn, |
| | | server3ID, 100, getChangelogPort(changelog1ID), |
| | | 1000, !emptyOldChanges, badgenId); |
| | | broker3 = openReplicationSession(baseDN, server3ID, 100, |
| | | getChangelogPort(changelog1ID), 1000, !emptyOldChanges, badgenId); |
| | | Thread.sleep(1000); |
| | | } catch (SocketException se) |
| | | { |
| | | fail("Broker connection is expected to be accepted."); |
| | | } |
| | | |
| | | debugInfo("Expecting that broker3 is in bad gen id since it has a bad genId"); |
| | | assertTrue(isDegradedDueToGenerationId(replServer1, server3ID)); |
| | |
| | | |
| | | debugInfo("Verifying that all replservers genIds have been reset."); |
| | | genId = readGenIdFromSuffixRootEntry(); |
| | | assertEquals(replServer1.getGenerationId(baseDn.toNormalizedString()), genId); |
| | | assertEquals(replServer2.getGenerationId(baseDn.toNormalizedString()), genId); |
| | | assertEquals(replServer3.getGenerationId(baseDn.toNormalizedString()), genId); |
| | | assertEquals(replServer1.getGenerationId(baseDN), genId); |
| | | assertEquals(replServer2.getGenerationId(baseDN), genId); |
| | | assertEquals(replServer3.getGenerationId(baseDN), genId); |
| | | |
| | | debugInfo("Adding reset task to DS." + genId); |
| | | taskReset = TestCaseUtils.makeEntry( |
| | |
| | | while(waitRes<100) |
| | | { |
| | | readGenIdFromSuffixRootEntry(); |
| | | if ((replServer1.getGenerationId(baseDn.toNormalizedString())==-1) |
| | | && (replServer2.getGenerationId(baseDn.toNormalizedString())==-1) |
| | | && (replServer3.getGenerationId(baseDn.toNormalizedString())==-1)) |
| | | if (replServer1.getGenerationId(baseDN) == -1 |
| | | && replServer2.getGenerationId(baseDN) == -1 |
| | | && replServer3.getGenerationId(baseDN) == -1) |
| | | break; |
| | | waitRes++; |
| | | Thread.sleep(100); |
| | | } |
| | | assertEquals(replServer1.getGenerationId(baseDn.toNormalizedString()), -1, "test"+i); |
| | | assertEquals(replServer2.getGenerationId(baseDn.toNormalizedString()), -1, "test"+i); |
| | | assertEquals(replServer3.getGenerationId(baseDn.toNormalizedString()), -1, "test"+i); |
| | | assertEquals(replServer1.getGenerationId(baseDN), -1, "test" + i); |
| | | assertEquals(replServer2.getGenerationId(baseDN), -1, "test" + i); |
| | | assertEquals(replServer3.getGenerationId(baseDN), -1, "test" + i); |
| | | |
| | | debugInfo( |
| | | "Disconnect DS from replServer1 (required in order to DEL entries)."); |
| | |
| | | |
| | | private boolean isDegradedDueToGenerationId(ReplicationServer rs, int serverId) |
| | | { |
| | | ReplicationServerDomain domain = rs.getReplicationServerDomain(baseDn.toNormalizedString()); |
| | | ReplicationServerDomain domain = rs.getReplicationServerDomain(baseDN); |
| | | return domain.isDegradedDueToGenerationId(serverId); |
| | | } |
| | | |
| | |
| | | for (int i=0; i< 5; i++) |
| | | { |
| | | long generationId = 1000+i; |
| | | broker = openReplicationSession(baseDn, |
| | | server2ID, 100, getChangelogPort(changelog1ID), |
| | | 1000, !emptyOldChanges, generationId); |
| | | broker = openReplicationSession(baseDN, server2ID, 100, |
| | | getChangelogPort(changelog1ID), 1000, !emptyOldChanges, generationId); |
| | | debugInfo(testCase + " Expect genId to be set in memory on the replication " + |
| | | " server side even if not wrote on disk/db since no change occurred."); |
| | | rgenId = replServer1.getGenerationId(baseDn.toNormalizedString()); |
| | | rgenId = replServer1.getGenerationId(baseDN); |
| | | assertEquals(rgenId, generationId); |
| | | broker.stop(); |
| | | broker = null; |
| | |
| | | private static final String EXAMPLE_DN = "dc=example,dc=com"; |
| | | private static int[] replServerPort = new int[20]; |
| | | |
| | | private DN baseDn; |
| | | private DN baseDN; |
| | | private ReplicationBroker server2; |
| | | private ReplicationBroker server3; |
| | | private ReplicationServer changelog1; |
| | |
| | | log("Setup: debugEnabled:" + debugEnabled()); |
| | | |
| | | // This test suite depends on having the schema available. |
| | | baseDn = DN.decode(EXAMPLE_DN); |
| | | baseDN = DN.decode(EXAMPLE_DN); |
| | | |
| | | // This test uses import tasks which do not work with memory backend |
| | | // (like the test backend we use in every tests): backend is disabled then |
| | |
| | | "Unable to add the synchronized server"); |
| | | configEntryList.add(synchroServerEntry.getDN()); |
| | | |
| | | replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn); |
| | | replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDN); |
| | | |
| | | assertTrue(!replDomain.ieRunning(), |
| | | "ReplicationDomain: Import/Export is not expected to be running"); |
| | |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-initialize-from-remote-replica", |
| | | "ds-task-class-name: org.opends.server.tasks.InitializeTask", |
| | | "ds-task-initialize-domain-dn: " + baseDn, |
| | | "ds-task-initialize-domain-dn: " + baseDN, |
| | | "ds-task-initialize-replica-server-id: -3"); |
| | | addTask(taskInit, ResultCode.OTHER, |
| | | ERR_INVALID_IMPORT_SOURCE.get(baseDn.toNormalizedString(), |
| | | ERR_INVALID_IMPORT_SOURCE.get(baseDN.toNormalizedString(), |
| | | Integer.toString(server1ID),"-3","")); |
| | | |
| | | // Scope containing a serverID absent from the domain |
| | |
| | | |
| | | private Set<Integer> getConnectedDSServerIds(ReplicationServer changelog) |
| | | { |
| | | ReplicationServerDomain domain = changelog.getReplicationServerDomain(baseDn.toNormalizedString()); |
| | | ReplicationServerDomain domain = changelog.getReplicationServerDomain(baseDN); |
| | | return domain.getConnectedDSs().keySet(); |
| | | } |
| | | |
| | |
| | | log(testCase + " Will connect server 2 to " + changelog2ID); |
| | | server2 = openReplicationSession(DN.decode(EXAMPLE_DN), |
| | | server2ID, 100, getChangelogPort(changelog2ID), |
| | | 1000, emptyOldChanges, |
| | | changelog1.getGenerationId(baseDn.toNormalizedString())); |
| | | 1000, emptyOldChanges, changelog1.getGenerationId(baseDN)); |
| | | } |
| | | |
| | | // Connect a broker acting as server 3 to Repl Server 3 |
| | |
| | | log(testCase + " Will connect server 3 to " + changelog3ID); |
| | | server3 = openReplicationSession(DN.decode(EXAMPLE_DN), |
| | | server3ID, 100, getChangelogPort(changelog3ID), |
| | | 1000, emptyOldChanges, |
| | | changelog1.getGenerationId(baseDn.toNormalizedString())); |
| | | 1000, emptyOldChanges, changelog1.getGenerationId(baseDN)); |
| | | } |
| | | |
| | | // Thread.sleep(500); |
| | |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-initialize-from-remote-replica", |
| | | "ds-task-class-name: org.opends.server.tasks.InitializeTask", |
| | | "ds-task-initialize-domain-dn: " + baseDn, |
| | | "ds-task-initialize-domain-dn: " + baseDN, |
| | | "ds-task-initialize-replica-server-id: " + 20); |
| | | |
| | | addTask(taskInit, ResultCode.SUCCESS, null); |
| | | |
| | | waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR, |
| | | ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( |
| | | baseDn.toString(), "20")); |
| | | baseDN.toString(), "20")); |
| | | |
| | | // Test 2 |
| | | taskInit = TestCaseUtils.makeEntry( |
| | |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-initialize-from-remote-replica", |
| | | "ds-task-class-name: org.opends.server.tasks.InitializeTask", |
| | | "ds-task-initialize-domain-dn: " + baseDn, |
| | | "ds-task-initialize-domain-dn: " + baseDN, |
| | | "ds-task-initialize-replica-server-id: " + server1ID); |
| | | |
| | | addTask(taskInit, ResultCode.OTHER, ERR_INVALID_IMPORT_SOURCE.get( |
| | | baseDn.toNormalizedString(), |
| | | Integer.toString(server1ID),"20","")); |
| | | baseDN.toNormalizedString(), Integer.toString(server1ID),"20","")); |
| | | |
| | | if (replDomain != null) |
| | | { |
| | |
| | | @Test(enabled=false) |
| | | public void initializeTargetNoTarget() throws Exception |
| | | { |
| | | String testCase = "initializeTargetNoTarget" + baseDn; |
| | | String testCase = "initializeTargetNoTarget" + baseDN; |
| | | log("Starting "+testCase); |
| | | |
| | | try |
| | |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-initialize-remote-replica", |
| | | "ds-task-class-name: org.opends.server.tasks.InitializeTargetTask", |
| | | "ds-task-initialize-domain-dn: " + baseDn, |
| | | "ds-task-initialize-domain-dn: " + baseDN, |
| | | "ds-task-initialize-replica-server-id: " + 0); |
| | | |
| | | addTask(taskInit, ResultCode.SUCCESS, null); |
| | | |
| | | waitTaskState(taskInit, TaskState.STOPPED_BY_ERROR, |
| | | ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDn.toString(), "0")); |
| | | ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(baseDN.toString(), "0")); |
| | | |
| | | if (replDomain != null) |
| | | { |
| | |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-initialize-from-remote-replica", |
| | | "ds-task-class-name: org.opends.server.tasks.InitializeTask", |
| | | "ds-task-initialize-domain-dn: " + baseDn, |
| | | "ds-task-initialize-domain-dn: " + baseDN, |
| | | "ds-task-initialize-replica-server-id: " + server2ID); |
| | | |
| | | addTask(taskInit, ResultCode.SUCCESS, null); |
| | |
| | | "objectclass: ds-task", |
| | | "objectclass: ds-task-initialize-from-remote-replica", |
| | | "ds-task-class-name: org.opends.server.tasks.InitializeTask", |
| | | "ds-task-initialize-domain-dn: " + baseDn, |
| | | "ds-task-initialize-domain-dn: " + baseDN, |
| | | "ds-task-initialize-replica-server-id: " + server2ID); |
| | | |
| | | // Second task is expected to be rejected |
| | |
| | | */ |
| | | package org.opends.server.replication; |
| | | |
| | | import java.io.File; |
| | | import java.net.SocketException; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.concurrent.locks.Lock; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import java.io.File; |
| | | import java.net.SocketException; |
| | | import java.util.ArrayList; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.concurrent.locks.Lock; |
| | | |
| | | import static org.opends.server.config.ConfigConstants.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | |
| | | /** |
| | | * Replication monitor stats |
| | | */ |
| | | private DN monitorDn; |
| | | private DN monitorDN; |
| | | private String monitorAttr; |
| | | private long lastCount; |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the domain associated to the baseDn, and the value of the generationId |
| | | * Retrieves the domain associated to the baseDN, and the value of the generationId |
| | | * of this domain. If the domain does not exist, returns the default hard-coded\ |
| | | * value of the generationId corresponding to test backend with its default |
| | | * initial o=test root root entry. |
| | | * |
| | | * @param baseDn The baseDn for which we want the generationId |
| | | * @param baseDN The baseDN for which we want the generationId |
| | | * @return The value of the generationId. |
| | | */ |
| | | static protected long getGenerationId(DN baseDn) |
| | | static protected long getGenerationId(DN baseDN) |
| | | { |
| | | // This is the value of the generationId computed by the server when the |
| | | // test suffix (o=test) has only the root entry created. |
| | | long genId = TEST_DN_WITH_ROOT_ENTRY_GENID; |
| | | try |
| | | { |
| | | LDAPReplicationDomain replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn); |
| | | LDAPReplicationDomain replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDN); |
| | | genId = replDomain.getGenerationID(); |
| | | } |
| | | catch(Exception e) {} |
| | |
| | | * does not exist, take the 'empty backend' generationID. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession( |
| | | final DN baseDn, int serverId, int window_size, |
| | | final DN baseDN, int serverId, int window_size, |
| | | int port, int timeout, boolean emptyOldChanges) |
| | | throws Exception, SocketException |
| | | { |
| | | return openReplicationSession(baseDn, serverId, window_size, |
| | | port, timeout, emptyOldChanges, getGenerationId(baseDn), null); |
| | | return openReplicationSession(baseDN, serverId, window_size, |
| | | port, timeout, emptyOldChanges, getGenerationId(baseDN), null); |
| | | } |
| | | |
| | | /** |
| | |
| | | * providing the generationId. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession( |
| | | final DN baseDn, int serverId, int window_size, |
| | | final DN baseDN, int serverId, int window_size, |
| | | int port, int timeout, boolean emptyOldChanges, |
| | | long generationId) |
| | | throws Exception, SocketException |
| | | { |
| | | return openReplicationSession(baseDn, serverId, window_size, |
| | | return openReplicationSession(baseDN, serverId, window_size, |
| | | port, timeout, emptyOldChanges, generationId, null); |
| | | } |
| | | |
| | |
| | | * providing the generationId. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession( |
| | | final DN baseDn, int serverId, int window_size, |
| | | final DN baseDN, int serverId, int window_size, |
| | | int port, int timeout, boolean emptyOldChanges, |
| | | long generationId, ReplicationDomain replicationDomain) |
| | | throws Exception, SocketException |
| | |
| | | ServerState state = new ServerState(); |
| | | |
| | | if (emptyOldChanges) |
| | | new PersistentServerState(baseDn, serverId, new ServerState()); |
| | | new PersistentServerState(baseDN, serverId, new ServerState()); |
| | | |
| | | ReplicationBroker broker = new ReplicationBroker(replicationDomain, |
| | | state, baseDn.toNormalizedString(), serverId, window_size, |
| | | state, baseDN, serverId, window_size, |
| | | generationId, 100000, getReplSessionSecurity(), (byte)1, 500); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | List<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + port); |
| | | broker.start(servers); |
| | | if (timeout != 0) |
| | |
| | | /** |
| | | * Open a replicationServer session to the local ReplicationServer |
| | | * with a default value generationId. |
| | | * |
| | | */ |
| | | protected ReplicationBroker openReplicationSession( |
| | | final DN baseDn, int serverId, int window_size, |
| | | final DN baseDN, int serverId, int window_size, |
| | | int port, int timeout, ServerState state) |
| | | throws Exception, SocketException |
| | | { |
| | | return openReplicationSession(baseDn, serverId, window_size, |
| | | port, timeout, state, getGenerationId(baseDn)); |
| | | return openReplicationSession(baseDN, serverId, window_size, |
| | | port, timeout, state, getGenerationId(baseDN)); |
| | | } |
| | | |
| | | /** |
| | |
| | | * starting with a given ServerState. |
| | | */ |
| | | protected ReplicationBroker openReplicationSession( |
| | | final DN baseDn, int serverId, int window_size, |
| | | final DN baseDN, int serverId, int window_size, |
| | | int port, int timeout, ServerState state, long generationId) |
| | | throws Exception, SocketException |
| | | { |
| | | ReplicationBroker broker = new ReplicationBroker(null, |
| | | state, baseDn.toNormalizedString(), serverId, window_size, generationId, |
| | | state, baseDN, serverId, window_size, generationId, |
| | | 100000, getReplSessionSecurity(), (byte)1, 500); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | List<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + port); |
| | | broker.start(servers); |
| | | checkConnection(30, broker, port); |
| | |
| | | /** |
| | | * Open a replicationServer session with flow control to the local |
| | | * ReplicationServer. |
| | | * |
| | | */ |
| | | protected ReplicationBroker openReplicationSession( |
| | | final DN baseDn, int serverId, int window_size, |
| | | final DN baseDN, int serverId, int window_size, |
| | | int port, int timeout, int maxSendQueue, int maxRcvQueue, |
| | | boolean emptyOldChanges) |
| | | throws Exception, SocketException |
| | |
| | | ServerState state = new ServerState(); |
| | | |
| | | if (emptyOldChanges) |
| | | new PersistentServerState(baseDn, serverId, new ServerState()); |
| | | new PersistentServerState(baseDN, serverId, new ServerState()); |
| | | |
| | | ReplicationBroker broker = new ReplicationBroker(null, |
| | | state, baseDn.toNormalizedString(), serverId, window_size, |
| | | getGenerationId(baseDn), 0, getReplSessionSecurity(), (byte)1, 500); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | state, baseDN, serverId, window_size, |
| | | getGenerationId(baseDN), 0, getReplSessionSecurity(), (byte)1, 500); |
| | | List<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + port); |
| | | broker.start(servers); |
| | | checkConnection(30, broker, port); |
| | |
| | | * @return The monitor value |
| | | * @throws Exception If an error occurs. |
| | | */ |
| | | protected long getMonitorAttrValue(DN baseDn, String attr) throws Exception |
| | | protected long getMonitorAttrValue(DN baseDN, String attr) throws Exception |
| | | { |
| | | String monitorFilter = |
| | | "(&(cn=Directory server*)(domain-name=" + baseDn + "))"; |
| | | String monitorFilter = "(&(cn=Directory server*)(domain-name=" + baseDN + "))"; |
| | | |
| | | InternalSearchOperation op; |
| | | int count = 0; |
| | |
| | | try |
| | | { |
| | | Entry entry = DirectoryServer.getEntry(dn); |
| | | if (entry == null) |
| | | return null; |
| | | else |
| | | if (entry != null) |
| | | return entry.duplicate(true); |
| | | return null; |
| | | } |
| | | finally |
| | | { |
| | |
| | | /** |
| | | * Update the monitor count for the specified monitor attribute. |
| | | */ |
| | | protected void updateMonitorCount(DN baseDn, String attr) { |
| | | monitorDn = baseDn; |
| | | protected void updateMonitorCount(DN baseDN, String attr) throws Exception |
| | | { |
| | | monitorDN = baseDN; |
| | | monitorAttr = attr; |
| | | try |
| | | { |
| | | lastCount = getMonitorAttrValue(baseDn, attr); |
| | | } |
| | | catch (Exception ex) |
| | | { |
| | | ex.printStackTrace(); |
| | | assertTrue(false); |
| | | } |
| | | lastCount = getMonitorAttrValue(baseDN, attr); |
| | | } |
| | | |
| | | /** |
| | |
| | | protected long getMonitorDelta() { |
| | | long delta = 0; |
| | | try { |
| | | long currentCount = getMonitorAttrValue(monitorDn, monitorAttr); |
| | | long currentCount = getMonitorAttrValue(monitorDN, monitorAttr); |
| | | delta = (currentCount - lastCount); |
| | | lastCount = currentCount; |
| | | } catch (Exception ex) { |
| | |
| | | // Check that the task contains some log messages. |
| | | AttributeType logMessagesType = DirectoryServer.getAttributeType( |
| | | ATTR_TASK_LOG_MESSAGES.toLowerCase()); |
| | | ArrayList<String> logMessages = new ArrayList<String>(); |
| | | List<String> logMessages = new ArrayList<String>(); |
| | | resultEntry.getAttributeValues(logMessagesType, |
| | | DirectoryStringSyntax.DECODER, |
| | | logMessages); |
| | |
| | | List<String> replicationServers = new ArrayList<String>(); |
| | | replicationServers.add("localhost:" + replServerPort); |
| | | |
| | | replicationDomain = |
| | | new FakeReplicationDomain((firstBackend ? TEST_ROOT_DN_STRING |
| | | : TEST2_ROOT_DN_STRING), DS2_ID, replicationServers, 100, 1000, |
| | | generationId); |
| | | DN baseDN = DN.decode(firstBackend ? TEST_ROOT_DN_STRING : TEST2_ROOT_DN_STRING); |
| | | replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 100, 1000, generationId); |
| | | |
| | | // Test connection |
| | | assertTrue(replicationDomain.isConnected()); |
| | |
| | | private long generationID = -1; |
| | | |
| | | public FakeReplicationDomain( |
| | | String serviceID, |
| | | DN baseDN, |
| | | int serverID, |
| | | Collection<String> replicationServers, |
| | | int window, |
| | | long heartbeatInterval, |
| | | long generationId) throws ConfigException |
| | | { |
| | | super(serviceID, serverID, 100); |
| | | super(baseDN, serverID, 100); |
| | | generationID = generationId; |
| | | startPublishService(replicationServers, window, heartbeatInterval, 500); |
| | | startListenService(); |
| | |
| | | { |
| | | |
| | | private static final String EXAMPLE_DN = "dc=example,dc=com"; // Server id definitions |
| | | private static DN EXAMPLE_DN_; |
| | | |
| | | private static final int DS1_ID = 1; |
| | | private static final int DS2_ID = 2; |
| | |
| | | } |
| | | |
| | | // Clear any reference to a domain in synchro plugin |
| | | MultimasterReplication.deleteDomain(DN.decode(EXAMPLE_DN)); |
| | | MultimasterReplication.deleteDomain(EXAMPLE_DN_); |
| | | |
| | | if (ds2 != null) |
| | | { |
| | |
| | | SortedSet<String> replServers = new TreeSet<String>(); |
| | | replServers.add("localhost:" + rs1Port); |
| | | |
| | | DN baseDn = DN.decode(EXAMPLE_DN); |
| | | DomainFakeCfg domainConf = new DomainFakeCfg(baseDn, dsId, replServers); |
| | | DomainFakeCfg domainConf = new DomainFakeCfg(EXAMPLE_DN_, dsId, replServers); |
| | | LDAPReplicationDomain replicationDomain = MultimasterReplication.createNewDomain(domainConf); |
| | | replicationDomain.start(); |
| | | SynchronizationProvider<SynchronizationProviderCfg> provider = |
| | |
| | | private ReplicationBroker createReplicationBroker(int dsId, |
| | | ServerState state, long generationId) throws Exception |
| | | { |
| | | ReplicationBroker broker = new ReplicationBroker(null, |
| | | state, EXAMPLE_DN, dsId, 100, generationId, 0, |
| | | new ReplSessionSecurity(null, null, null, true), (byte) 1, 500); |
| | | ReplSessionSecurity security = new ReplSessionSecurity(null, null, null, true); |
| | | ReplicationBroker broker = new ReplicationBroker(null, state, EXAMPLE_DN_, |
| | | dsId, 100, generationId, 0, security, (byte) 1, 500); |
| | | List<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:" + rs1Port); |
| | | broker.start(servers); |
| | |
| | | public void setUp() throws Exception |
| | | { |
| | | super.setUp(); |
| | | EXAMPLE_DN_ = DN.decode(EXAMPLE_DN); |
| | | |
| | | // Note: this test does not use the memory test backend as for having a DS |
| | | // going into degraded status, we need to send a lot of updates. This makes |
| | | // the memory test backend crash with OutOfMemoryError. So we prefer here |
| | | // a backend backed up with a file |
| | | TestCaseUtils.clearJEBackend(false, "userRoot", EXAMPLE_DN); |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.service.ReplicationDomain; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.BeforeClass; |
| | |
| | | int rsPort = getRsPort(rsId); |
| | | |
| | | FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain( |
| | | TEST_ROOT_DN_STRING, serverId, generationId, |
| | | DN.decode(TEST_ROOT_DN_STRING), serverId, generationId, |
| | | (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout, |
| | | scenario, serverState); |
| | | |
| | |
| | | * @throws org.opends.server.config.ConfigException |
| | | */ |
| | | public FakeReplicationDomain( |
| | | String baseDN, |
| | | DN baseDN, |
| | | int serverID, |
| | | long generationId, |
| | | byte groupId, |
| | |
| | | import java.util.*; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.admin.std.server.ExternalChangelogDomainCfg; |
| | | import org.opends.server.api.Backend; |
| | | import org.opends.server.api.ConnectionHandler; |
| | | import org.opends.server.backends.MemoryBackend; |
| | |
| | | /** The port of the replicationServer. */ |
| | | private int replicationServerPort; |
| | | |
| | | /** base DN for "o=test" */ |
| | | private static DN TEST_ROOT_DN; |
| | | /** base DN for "o=test2" */ |
| | | private static DN TEST_ROOT_DN2; |
| | | |
| | | private static final String TEST_BACKEND_ID2 = "test2"; |
| | | private static final String TEST_ROOT_DN_STRING2 = "o=" + TEST_BACKEND_ID2; |
| | | |
| | |
| | | public void setUp() throws Exception |
| | | { |
| | | super.setUp(); |
| | | TEST_ROOT_DN = DN.decode(TEST_ROOT_DN_STRING); |
| | | TEST_ROOT_DN2 = DN.decode(TEST_ROOT_DN_STRING2); |
| | | |
| | | // This test suite depends on having the schema available. |
| | | configure(); |
| | |
| | | try |
| | | { |
| | | // create 2 regular brokers on the 2 suffixes |
| | | server01 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, |
| | | server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | server02 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING2), SERVER_ID_2, |
| | | server02 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2, |
| | | 100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID); |
| | | |
| | | // create and publish 1 change on each suffix |
| | |
| | | |
| | | try |
| | | { |
| | | DN baseDn2 = DN.decode(TEST_ROOT_DN_STRING2); |
| | | |
| | | server01 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, |
| | | server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | // create and publish 1 change on each suffix |
| | | long time = TimeThread.getTime(); |
| | | |
| | | CSN csn1 = new CSN(time, 1, SERVER_ID_1); |
| | | DeleteMsg delMsg1 = |
| | | new DeleteMsg("o=" + tn + "1," + TEST_ROOT_DN_STRING, csn1, |
| | | "ECLBasicMsg1uid"); |
| | | DeleteMsg delMsg1 = new DeleteMsg("o=" + tn + "1," + TEST_ROOT_DN_STRING, csn1, "ECLBasicMsg1uid"); |
| | | server01.publish(delMsg1); |
| | | debugInfo(tn, "publishes:" + delMsg1); |
| | | |
| | |
| | | backend2.setPrivateBackend(true); |
| | | SortedSet<String> replServers = newSet("localhost:" + replicationServerPort); |
| | | |
| | | DomainFakeCfg domainConf = |
| | | new DomainFakeCfg(baseDn2, 1602, replServers); |
| | | DomainFakeCfg domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1602, replServers); |
| | | domain2 = startNewDomain(domainConf, null,null); |
| | | |
| | | sleep(1000); |
| | | addEntry(createEntry(baseDn2)); |
| | | addEntry(createEntry(TEST_ROOT_DN2)); |
| | | sleep(2000); |
| | | |
| | | // Search on ECL from start on all suffixes |
| | |
| | | assertThat(entries).hasSize(2); |
| | | debugAndWriteEntries(null, entries, tn); |
| | | |
| | | ExternalChangelogDomainFakeCfg eclCfg = |
| | | new ExternalChangelogDomainFakeCfg(false, null, null); |
| | | ExternalChangelogDomainCfg eclCfg = new ExternalChangelogDomainFakeCfg(false, null, null); |
| | | domainConf.setExternalChangelogDomain(eclCfg); |
| | | domain2.applyConfigurationChange(domainConf); |
| | | |
| | |
| | | |
| | | LDIFWriter ldifWriter = getLDIFWriter(); |
| | | |
| | | s1test = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, |
| | | s1test = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | s2test2 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING2), SERVER_ID_2, |
| | | s2test2 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2, |
| | | 100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID); |
| | | sleep(500); |
| | | |
| | |
| | | |
| | | // Test startState ("first cookie") of the ECL |
| | | // -- |
| | | s1test2 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING2), 1203, |
| | | s1test2 = openReplicationSession(TEST_ROOT_DN2, 1203, |
| | | 100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID); |
| | | |
| | | s2test = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), 1204, |
| | | s2test = openReplicationSession(TEST_ROOT_DN, 1204, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | sleep(500); |
| | | |
| | |
| | | publishDeleteMsgInOTest(s2test, csn9, tn, 9); |
| | | sleep(500); |
| | | |
| | | ServerState startState = getReplicationDomainStartState(TEST_ROOT_DN_STRING); |
| | | ServerState startState = getReplicationDomainStartState(TEST_ROOT_DN); |
| | | assertEquals(startState.getCSN(s1test.getServerId()).getSeqnum(), 1); |
| | | assertTrue(startState.getCSN(s2test.getServerId()) != null); |
| | | assertEquals(startState.getCSN(s2test.getServerId()).getSeqnum(), 7); |
| | | |
| | | startState = getReplicationDomainStartState(TEST_ROOT_DN_STRING2); |
| | | startState = getReplicationDomainStartState(TEST_ROOT_DN2); |
| | | assertEquals(startState.getCSN(s2test2.getServerId()).getSeqnum(), 2); |
| | | assertEquals(startState.getCSN(s1test2.getServerId()).getSeqnum(), 6); |
| | | |
| | |
| | | debugInfo(tn, "Ending test successfully"); |
| | | } |
| | | |
| | | private ServerState getReplicationDomainStartState(String baseDn) |
| | | private ServerState getReplicationDomainStartState(DN baseDN) |
| | | { |
| | | return replicationServer.getReplicationServerDomain(baseDn).getStartState(); |
| | | return replicationServer.getReplicationServerDomain(baseDN).getStartState(); |
| | | } |
| | | |
| | | private String getCookie(List<SearchResultEntry> entries, |
| | |
| | | // 1. Populate the changelog and read the cookie |
| | | |
| | | // Creates broker on o=test |
| | | server01 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, |
| | | server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | final CSN[] csns = generateCSNs(4, SERVER_ID_1); |
| | |
| | | // returns the appropriate error. |
| | | publishDeleteMsgInOTest(server01, csns[3], tn, 1); |
| | | |
| | | debugInfo(tn, "d1 trimdate" + getReplicationDomainStartState("o=test")); |
| | | debugInfo(tn, "d2 trimdate" + getReplicationDomainStartState("o=test2")); |
| | | debugInfo(tn, "d1 trimdate" + getReplicationDomainStartState(TEST_ROOT_DN)); |
| | | debugInfo(tn, "d2 trimdate" + getReplicationDomainStartState(TEST_ROOT_DN2)); |
| | | searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, tn, UNWILLING_TO_PERFORM); |
| | | assertEquals(searchOp.getSearchEntries().size(), 0); |
| | | assertTrue(searchOp.getErrorMessage().toString().startsWith( |
| | | ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get("o=test").toString()), |
| | | ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(TEST_ROOT_DN_STRING).toString()), |
| | | searchOp.getErrorMessage().toString()); |
| | | } |
| | | finally |
| | |
| | | LDIFWriter ldifWriter = getLDIFWriter(); |
| | | |
| | | // Creates broker on o=test |
| | | server01 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, |
| | | server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | // Creates broker on o=test2 |
| | | server02 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING2), SERVER_ID_2, |
| | | server02 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | String user1entryUUID = "11111111-1111-1111-1111-111111111111"; |
| | |
| | | |
| | | // Publish modDN |
| | | csnCounter++; |
| | | final DN newSuperior = DN.decode(TEST_ROOT_DN_STRING2); |
| | | final DN newSuperior = TEST_ROOT_DN2; |
| | | ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null, |
| | | DN.decode("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN |
| | | RDN.decode("uid="+tn+"new4"), // new rdn |
| | |
| | | |
| | | { |
| | | // Create broker on suffix |
| | | ReplicationBroker server01 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, |
| | | ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | CSN[] csns = generateCSNs(2, SERVER_ID_1); |
| | |
| | | try |
| | | { |
| | | // Create broker on o=test |
| | | server01 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, |
| | | server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | server01.setChangeTimeHeartbeatInterval(100); //ms |
| | | |
| | | // Create broker on o=test2 |
| | | server02 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING2), SERVER_ID_2, |
| | | server02 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2, |
| | | 100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID); |
| | | server02.setChangeTimeHeartbeatInterval(100); //ms |
| | | |
| | |
| | | backend2 = initializeTestBackend(true, TEST_BACKEND_ID2); |
| | | |
| | | // -- |
| | | s1test = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, |
| | | s1test = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | s2test2 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING2), SERVER_ID_2, |
| | | s2test2 = openReplicationSession(TEST_ROOT_DN2, SERVER_ID_2, |
| | | 100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID); |
| | | sleep(500); |
| | | |
| | |
| | | sleep(500); |
| | | |
| | | // -- |
| | | s1test2 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING2), 1203, |
| | | s1test2 = openReplicationSession(TEST_ROOT_DN2, 1203, |
| | | 100, replicationServerPort, brokerSessionTimeout, true, EMPTY_DN_GENID); |
| | | |
| | | s2test = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), 1204, |
| | | s2test = openReplicationSession(TEST_ROOT_DN, 1204, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | sleep(500); |
| | | |
| | |
| | | publishDeleteMsgInOTest(s2test, csn9, tn, 9); |
| | | sleep(500); |
| | | |
| | | ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING); |
| | | ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN); |
| | | rsd1.getDbServerState(); |
| | | rsd1.getChangeTimeHeartbeatState(); |
| | | debugInfo(tn, rsd1.getBaseDn() |
| | | debugInfo(tn, rsd1.getBaseDN() |
| | | + " DbServerState=" + rsd1.getDbServerState() |
| | | + " ChangeTimeHeartBeatState=" + rsd1.getChangeTimeHeartbeatState() |
| | | + " eligibleCSN=" + rsd1.getEligibleCSN() |
| | | + " rs eligibleCSN=" + replicationServer.getEligibleCSN()); |
| | | // FIXME:ECL Enable this test by adding an assert on the right value |
| | | |
| | | ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING2); |
| | | ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN2); |
| | | rsd2.getDbServerState(); |
| | | rsd2.getChangeTimeHeartbeatState(); |
| | | debugInfo(tn, rsd2.getBaseDn() |
| | | debugInfo(tn, rsd2.getBaseDN() |
| | | + " DbServerState=" + rsd2.getDbServerState() |
| | | + " ChangeTimeHeartBeatState=" + rsd2.getChangeTimeHeartbeatState() |
| | | + " eligibleCSN=" + rsd2.getEligibleCSN() |
| | |
| | | LDIFWriter ldifWriter = getLDIFWriter(); |
| | | |
| | | // Creates broker on o=test |
| | | ReplicationBroker server01 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, |
| | | ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | String user1entryUUID = "11111111-1112-1113-1114-111111111115"; |
| | |
| | | DN.decode("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN |
| | | RDN.decode("uid="+tn+"new4"), // new rdn |
| | | true, // deleteoldrdn |
| | | DN.decode(TEST_ROOT_DN_STRING2)); // new superior |
| | | TEST_ROOT_DN2); // new superior |
| | | op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csns[3], user1entryUUID, "newparentId")); |
| | | LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op); |
| | | ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp); |
| | |
| | | LDIFWriter ldifWriter = getLDIFWriter(); |
| | | |
| | | // Creates broker on o=test |
| | | ReplicationBroker server01 = |
| | | openReplicationSession(DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, |
| | | ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 100, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | String user1entryUUID = "11111111-1112-1113-1114-111111111115"; |
| | |
| | | debugInfo(tn, "Starting test\n\n"); |
| | | |
| | | // Creates broker on o=test |
| | | ReplicationBroker server01 = |
| | | openReplicationSession(DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, 100, |
| | | ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, 100, |
| | | replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | String filter = "(changenumber=" + firstChangeNumber + ")"; |
| | |
| | | ECLCompatTestLimits(expectedFirst, expectedLast, true); |
| | | |
| | | // Creates broker on o=test |
| | | ReplicationBroker server01 = |
| | | openReplicationSession(DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, 100, |
| | | ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, 100, |
| | | replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | String user1entryUUID = "11111111-1112-1113-1114-111111111115"; |
| | |
| | | final CSN csn2 = csns[1]; |
| | | final CSN csn3 = csns[2]; |
| | | |
| | | ReplicationServerDomain rsdtest = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING); |
| | | ReplicationServerDomain rsdtest = replicationServer.getReplicationServerDomain(TEST_ROOT_DN); |
| | | // this empty state will force to count from the start of the DB |
| | | final ServerState fromStart = new ServerState(); |
| | | |
| | |
| | | assertEquals(rsdtest.getEligibleCount(fromStart, csns[0]), 0); |
| | | |
| | | // Creates broker on o=test |
| | | ReplicationBroker server01 = openReplicationSession( |
| | | DN.decode(TEST_ROOT_DN_STRING), SERVER_ID_1, |
| | | ReplicationBroker server01 = openReplicationSession(TEST_ROOT_DN, SERVER_ID_1, |
| | | 1000, replicationServerPort, brokerSessionTimeout, true); |
| | | |
| | | // Publish one first message |
| | |
| | | // Configure replication on this backend |
| | | // Add the root entry in the backend |
| | | backend2 = initializeTestBackend(false, TEST_BACKEND_ID2); |
| | | DN baseDn2 = DN.decode(TEST_ROOT_DN_STRING2); |
| | | |
| | | SortedSet<String> replServers = newSet("localhost:" + replicationServerPort); |
| | | |
| | | // on o=test2,sid=1702 include attrs set to : 'sn' |
| | | SortedSet<String> eclInclude = newSet("sn", "roomnumber"); |
| | | |
| | | DomainFakeCfg domainConf = new DomainFakeCfg(baseDn2, 1702, replServers); |
| | | DomainFakeCfg domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1702, replServers); |
| | | domain2 = startNewDomain(domainConf, eclInclude, eclInclude); |
| | | |
| | | backend3 = initializeTestBackend(false, TEST_BACKEND_ID3); |
| | |
| | | // on o=test2,sid=1704 include attrs set to : 'cn' |
| | | eclInclude = newSet("cn"); |
| | | |
| | | domainConf = new DomainFakeCfg(baseDn2, 1704, replServers); |
| | | domainConf = new DomainFakeCfg(TEST_ROOT_DN2, 1704, replServers); |
| | | domain21 = startNewDomain(domainConf, eclInclude, eclInclude); |
| | | |
| | | sleep(1000); |
| | | |
| | | addEntry(createEntry(baseDn2)); |
| | | addEntry(createEntry(TEST_ROOT_DN2)); |
| | | addEntry(createEntry(baseDn3)); |
| | | |
| | | String lentry = |
| | |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.Test; |
| | | |
| | |
| | | testRoot = createCleanDir(); |
| | | |
| | | dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer); |
| | | handler = new DbHandler(1, TEST_ROOT_DN_STRING, replicationServer, dbEnv, 5000); |
| | | handler = new DbHandler(1, DN.decode(TEST_ROOT_DN_STRING), replicationServer, dbEnv, 5000); |
| | | |
| | | CSNGenerator gen = new CSNGenerator( 1, 0); |
| | | CSN csn1 = gen.newCSN(); |
| | |
| | | |
| | | testRoot = createCleanDir(); |
| | | dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer); |
| | | handler = new DbHandler(1, TEST_ROOT_DN_STRING, replicationServer, dbEnv, 5000); |
| | | handler = new DbHandler(1, DN.decode(TEST_ROOT_DN_STRING), replicationServer, dbEnv, 5000); |
| | | |
| | | // Creates changes added to the dbHandler |
| | | CSNGenerator gen = new CSNGenerator( 1, 0); |
| | |
| | | |
| | | testRoot = createCleanDir(); |
| | | dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer); |
| | | handler = new DbHandler(1, TEST_ROOT_DN_STRING, replicationServer, dbEnv, 10); |
| | | handler = new DbHandler(1, DN.decode(TEST_ROOT_DN_STRING), replicationServer, dbEnv, 10); |
| | | handler.setCounterWindowSize(counterWindow); |
| | | |
| | | // Populate the db with 'max' msg |
| | |
| | | debugInfo(tn,"SHUTDOWN handler and recreate"); |
| | | handler.shutdown(); |
| | | |
| | | handler = new DbHandler(1, TEST_ROOT_DN_STRING, replicationServer, dbEnv, 10); |
| | | handler = new DbHandler(1, DN.decode(TEST_ROOT_DN_STRING), replicationServer, dbEnv, 10); |
| | | handler.setCounterWindowSize(counterWindow); |
| | | |
| | | // Test first and last |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.je.DraftCNDB.DraftCNDBCursor; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.Test; |
| | | |
| | |
| | | String value2 = "value2"; |
| | | String value3 = "value3"; |
| | | |
| | | String baseDN1 = "baseDN1"; |
| | | String baseDN2 = "baseDN2"; |
| | | String baseDN3 = "baseDN3"; |
| | | DN baseDN1 = DN.decode("o=baseDN1"); |
| | | DN baseDN2 = DN.decode("o=baseDN2"); |
| | | DN baseDN3 = DN.decode("o=baseDN3"); |
| | | |
| | | CSNGenerator gen = new CSNGenerator(1, 0); |
| | | CSN csn1 = gen.newCSN(); |
| | |
| | | } |
| | | } |
| | | |
| | | private void assertEqualTo(CNIndexRecord data, CSN csn, String baseDN, |
| | | String cookie) |
| | | private void assertEqualTo(CNIndexRecord data, CSN csn, DN baseDN, String cookie) |
| | | { |
| | | assertEquals(data.getCSN(), csn); |
| | | assertEquals(data.getBaseDN(), baseDN); |
| | |
| | | String value2 = "value2"; |
| | | String value3 = "value3"; |
| | | |
| | | String baseDN1 = "baseDN1"; |
| | | String baseDN2 = "baseDN2"; |
| | | String baseDN3 = "baseDN3"; |
| | | DN baseDN1 = DN.decode("o=baseDN1"); |
| | | DN baseDN2 = DN.decode("o=baseDN2"); |
| | | DN baseDN3 = DN.decode("o=baseDN3"); |
| | | |
| | | CSNGenerator gen = new CSNGenerator(1, 0); |
| | | CSN csn1 = gen.newCSN(); |
| | |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | |
| | | |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | | * This class is the minimum implementation of a Concrete ReplicationDomain |
| | | * used to test the Generic Replication Service. |
| | |
| | | * A blocking queue that is used to send the UpdateMsg received from the |
| | | * Replication Service. |
| | | */ |
| | | private BlockingQueue<UpdateMsg> queue = null; |
| | | private BlockingQueue<UpdateMsg> queue; |
| | | |
| | | /** A string that will be exported should exportBackend be called. */ |
| | | private String exportString = null; |
| | | private String exportString; |
| | | |
| | | /** |
| | | * A StringBuilder that will be used to build a build a new String should the |
| | | * import be called. |
| | | */ |
| | | private StringBuilder importString = null; |
| | | private StringBuilder importString; |
| | | |
| | | private int exportedEntryCount; |
| | | |
| | | private long generationID = 1; |
| | | |
| | | public FakeReplicationDomain( |
| | | String baseDN, |
| | | DN baseDN, |
| | | int serverID, |
| | | Collection<String> replicationServers, |
| | | int window, |
| | |
| | | } |
| | | |
| | | public FakeReplicationDomain( |
| | | String baseDN, |
| | | DN baseDN, |
| | | int serverID, |
| | | Collection<String> replicationServers, |
| | | int window, |
| | |
| | | |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | |
| | | private BlockingQueue<UpdateMsg> queue = null; |
| | | |
| | | public FakeStressReplicationDomain( |
| | | String baseDN, |
| | | DN baseDN, |
| | | int serverID, |
| | | Collection<String> replicationServers, |
| | | int window, |
| | |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | |
| | | int domain1ServerId, int domain2ServerId) |
| | | throws Exception |
| | | { |
| | | String testService = "test"; |
| | | DN testService = DN.decode("o=test"); |
| | | ReplicationServer replServer1 = null; |
| | | ReplicationServer replServer2 = null; |
| | | FakeReplicationDomain domain1 = null; |
| | |
| | | @Test(enabled=false) |
| | | public void publishPerf() throws Exception |
| | | { |
| | | String testService = "test"; |
| | | DN testService = DN.decode("o=test"); |
| | | ReplicationServer replServer1 = null; |
| | | int replServerID1 = 10; |
| | | FakeReplicationDomain domain1 = null; |
| | |
| | | public void exportAndImport(int serverId1, int serverId2) throws Exception |
| | | { |
| | | final int ENTRYCOUNT=5000; |
| | | String testService = "test"; |
| | | DN testService = DN.decode("o=test"); |
| | | ReplicationServer replServer = null; |
| | | int replServerID = 11; |
| | | FakeReplicationDomain domain1 = null; |
| | |
| | | public void exportAndImportAcross2ReplServers() throws Exception |
| | | { |
| | | final int ENTRYCOUNT=5000; |
| | | String testService = "test"; |
| | | DN testService = DN.decode("o=test"); |
| | | ReplicationServer replServer2 = null; |
| | | ReplicationServer replServer1 = null; |
| | | int replServerID = 11; |
| | |
| | | @Test(enabled=false) |
| | | public void senderInitialize() throws Exception |
| | | { |
| | | String testService = "test"; |
| | | DN testService = DN.decode("o=test"); |
| | | ReplicationServer replServer = null; |
| | | int replServerID = 12; |
| | | FakeStressReplicationDomain domain1 = null; |
| | |
| | | @Test(enabled=false) |
| | | public void receiverInitialize() throws Exception |
| | | { |
| | | String testService = "test"; |
| | | DN testService = DN.decode("o=test"); |
| | | ReplicationServer replServer = null; |
| | | int replServerID = 11; |
| | | FakeStressReplicationDomain domain1 = null; |