| | |
| | | import static org.opends.server.util.StaticUtils.createEntry; |
| | | import static org.opends.server.util.StaticUtils.getFileForPath; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | | |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | |
| | | import java.util.List; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.SortedMap; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeMap; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.zip.Adler32; |
| | | import java.util.zip.CheckedOutputStream; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | |
| | | import org.opends.server.protocols.ldap.LDAPAttribute; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.protocols.ldap.LDAPModification; |
| | | import org.opends.server.replication.common.AssuredMode; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.AckMessage; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachine; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.AddContext; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.DeleteContext; |
| | | import org.opends.server.replication.protocol.DoneMessage; |
| | | import org.opends.server.replication.protocol.EntryMessage; |
| | | import org.opends.server.replication.protocol.ErrorMessage; |
| | | import org.opends.server.replication.protocol.HeartbeatMessage; |
| | | import org.opends.server.replication.protocol.InitializeRequestMessage; |
| | | import org.opends.server.replication.protocol.InitializeTargetMessage; |
| | | import org.opends.server.replication.protocol.DoneMsg; |
| | | import org.opends.server.replication.protocol.EntryMsg; |
| | | import org.opends.server.replication.protocol.ErrorMsg; |
| | | import org.opends.server.replication.protocol.HeartbeatMsg; |
| | | import org.opends.server.replication.protocol.InitializeRequestMsg; |
| | | import org.opends.server.replication.protocol.InitializeTargetMsg; |
| | | import org.opends.server.replication.protocol.ModifyContext; |
| | | import org.opends.server.replication.protocol.ModifyDNMsg; |
| | | import org.opends.server.replication.protocol.ModifyDnContext; |
| | | import org.opends.server.replication.protocol.ModifyMsg; |
| | | import org.opends.server.replication.protocol.OperationContext; |
| | | import org.opends.server.replication.protocol.ReplSessionSecurity; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.replication.protocol.ResetGenerationId; |
| | | import org.opends.server.replication.protocol.RoutableMessage; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.protocol.ReplicationMsg; |
| | | import org.opends.server.replication.protocol.ResetGenerationIdMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.tasks.InitializeTargetTask; |
| | | import org.opends.server.tasks.InitializeTask; |
| | | import org.opends.server.tasks.TaskUtils; |
| | | import org.opends.server.types.AttributeBuilder; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.ExistingFileBehavior; |
| | | import org.opends.server.types.AbstractOperation; |
| | | import org.opends.server.types.Attribute; |
| | |
| | | // The update to replay message queue where the listener thread is going to |
| | | // push incoming update messages. |
| | | private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue; |
| | | private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs = |
| | | new TreeMap<ChangeNumber, UpdateMessage>(); |
| | | private SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs = |
| | | new TreeMap<ChangeNumber, UpdateMsg>(); |
| | | private AtomicInteger numRcvdUpdates = new AtomicInteger(0); |
| | | private AtomicInteger numSentUpdates = new AtomicInteger(0); |
| | | private AtomicInteger numProcessedUpdates = new AtomicInteger(); |
| | |
| | | /** |
| | | * This object is used to store the list of update currently being |
| | | * done on the local database. |
| | | * Is is usefull to make sure that the local operations are sent in a |
| | | * Is is useful to make sure that the local operations are sent in a |
| | | * correct order to the replication server and that the ServerState |
| | | * is not updated too early. |
| | | */ |
| | |
| | | /** |
| | | * It contain the updates that were done on other servers, transmitted |
| | | * by the replication server and that are currently replayed. |
| | | * It is usefull to make sure that dependencies between operations |
| | | * are correctly fullfilled and to to make sure that the ServerState is |
| | | * It is useful to make sure that dependencies between operations |
| | | * are correctly fulfilled and to to make sure that the ServerState is |
| | | * not updated too early. |
| | | */ |
| | | private RemotePendingChanges remotePendingChanges; |
| | |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | | short serverId; |
| | | private short serverId; |
| | | |
| | | // The context related to an import or export being processed |
| | | // Null when none is being processed. |
| | |
| | | |
| | | private Collection<String> replicationServers; |
| | | |
| | | private DN baseDN; |
| | | private DN baseDn; |
| | | |
| | | private boolean shutdown = false; |
| | | |
| | |
| | | |
| | | private int window = 100; |
| | | |
| | | /* |
| | | * Assured mode properties |
| | | */ |
| | | // Is assured mode enabled or not for this domain ? |
| | | private boolean assured = false; |
| | | // Assured sub mode (used when assured is true) |
| | | private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; |
| | | // Safe Data level (used when assuredMode is SAFE_DATA) |
| | | private byte assuredSdLevel = (byte)1; |
| | | // Timeout (in milliseconds) when waiting for acknowledgments |
| | | private long assuredTimeout = 1000; |
| | | |
| | | // Group id |
| | | private byte groupId = (byte)1; |
| | | // Referrals urls to be published to other servers of the topology |
| | | // TODO: fill that with all currently opened urls if no urls configured |
| | | private List<String> refUrls = new ArrayList<String>(); |
| | | |
| | | // Current status for this replicated domain |
| | | private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS; |
| | | |
| | | /* |
| | | * Properties for the last topology info received from the network. |
| | | */ |
| | | // Info for other DSs. |
| | | // Warning: does not contain info for us (for our server id) |
| | | private List<DSInfo> dsList = new ArrayList<DSInfo>(); |
| | | // Info for other RSs. |
| | | private List<RSInfo> rsList = new ArrayList<RSInfo>(); |
| | | |
| | | /** |
| | | * The isolation policy that this domain is going to use. |
| | | * This field describes the behavior of the domain when an update is |
| | | * attempted and the domain could not connect to any Replication Server. |
| | | * Possible values are accept-updates or deny-updates, but other values |
| | | * may be added in the futur. |
| | | * may be added in the future. |
| | | */ |
| | | private IsolationPolicy isolationpolicy; |
| | | |
| | |
| | | // The input stream for the import |
| | | ReplLDIFInputStream ldifImportInputStream = null; |
| | | // The target in the case of an export |
| | | short exportTarget = RoutableMessage.UNKNOWN_SERVER; |
| | | short exportTarget = RoutableMsg.UNKNOWN_SERVER; |
| | | // The source in the case of an import |
| | | short importSource = RoutableMessage.UNKNOWN_SERVER; |
| | | short importSource = RoutableMsg.UNKNOWN_SERVER; |
| | | |
| | | // The total entry count expected to be processed |
| | | long entryCount = 0; |
| | |
| | | |
| | | /** |
| | | * Initializes the import/export counters with the provider value. |
| | | * @param count The value with which to initialize the counters. |
| | | * @param total |
| | | * @param left |
| | | * @throws DirectoryException |
| | | */ |
| | | public void setCounters(long total, long left) |
| | | throws DirectoryException |
| | |
| | | /** |
| | | * Update the counters of the task for each entry processed during |
| | | * an import or export. |
| | | * @throws DirectoryException |
| | | */ |
| | | public void updateCounters() |
| | | throws DirectoryException |
| | |
| | | */ |
| | | public ExportThread(short target) |
| | | { |
| | | super("Export thread"); |
| | | super("Export thread " + serverId); |
| | | this.target = target; |
| | | } |
| | | |
| | |
| | | LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue) |
| | | throws ConfigException |
| | | { |
| | | super("replicationDomain_" + configuration.getBaseDN()); |
| | | super("Replication State Saver for server id " + configuration.getServerId() |
| | | + " and domain " + configuration.getBaseDN()); |
| | | |
| | | // Read the configuration parameters. |
| | | replicationServers = configuration.getReplicationServer(); |
| | | serverId = (short) configuration.getServerId(); |
| | | baseDN = configuration.getBaseDN(); |
| | | baseDn = configuration.getBaseDN(); |
| | | window = configuration.getWindowSize(); |
| | | heartbeatInterval = configuration.getHeartbeatInterval(); |
| | | isolationpolicy = configuration.getIsolationPolicy(); |
| | |
| | | this.updateToReplayQueue = updateToReplayQueue; |
| | | |
| | | /* |
| | | * Fill assured configuration properties |
| | | */ |
| | | AssuredType assuredType = configuration.getAssuredType(); |
| | | switch (assuredType) |
| | | { |
| | | case NOT_ASSURED: |
| | | assured = false; |
| | | break; |
| | | case SAFE_DATA: |
| | | assured = true; |
| | | this.assuredMode = AssuredMode.SAFE_DATA_MODE; |
| | | break; |
| | | case SAFE_READ: |
| | | assured = true; |
| | | this.assuredMode = AssuredMode.SAFE_READ_MODE; |
| | | break; |
| | | } |
| | | this.assuredSdLevel = (byte)configuration.getAssuredSdLevel(); |
| | | this.groupId = (byte)configuration.getGroupId(); |
| | | this.assuredTimeout = configuration.getAssuredTimeout(); |
| | | SortedSet<String> urls = configuration.getReferralsUrl(); |
| | | if (urls != null) |
| | | { |
| | | for (String url : urls) |
| | | { |
| | | this.refUrls.add(url); |
| | | } |
| | | } |
| | | |
| | | /* |
| | | * Modify conflicts are solved for all suffixes but the schema suffix |
| | | * because we don't want to store extra information in the schema |
| | | * ldif files. |
| | | * This has no negative impact because the changes on schema should |
| | | * not produce conflicts. |
| | | */ |
| | | if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0) |
| | | if (baseDn.compareTo(DirectoryServer.getSchemaDN()) == 0) |
| | | { |
| | | solveConflictFlag = false; |
| | | } |
| | |
| | | * Create a new Persistent Server State that will be used to store |
| | | * the last ChangeNmber seen from all LDAP servers in the topology. |
| | | */ |
| | | state = new PersistentServerState(baseDN, serverId); |
| | | state = new PersistentServerState(baseDn, serverId); |
| | | |
| | | /* |
| | | * Create a replication monitor object responsible for publishing |
| | |
| | | monitor = new ReplicationMonitor(this); |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | |
| | | Backend backend = retrievesBackend(baseDN); |
| | | Backend backend = retrievesBackend(baseDn); |
| | | if (backend == null) |
| | | { |
| | | throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get( |
| | | baseDN.toNormalizedString())); |
| | | baseDn.toNormalizedString())); |
| | | } |
| | | |
| | | try |
| | |
| | | catch (DirectoryException e) |
| | | { |
| | | logError(ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toNormalizedString(), e.getLocalizedMessage())); |
| | | baseDn.toNormalizedString(), e.getLocalizedMessage())); |
| | | } |
| | | |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | */ |
| | | broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue, |
| | | maxReceiveDelay, maxSendQueue, maxSendDelay, window, |
| | | broker = new ReplicationBroker(this, state, baseDn, serverId, |
| | | maxReceiveQueue, maxReceiveDelay, maxSendQueue, maxSendDelay, window, |
| | | heartbeatInterval, generationId, |
| | | new ReplSessionSecurity(configuration)); |
| | | new ReplSessionSecurity(configuration),getGroupId()); |
| | | |
| | | broker.start(replicationServers); |
| | | |
| | |
| | | */ |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDN; |
| | | return baseDn; |
| | | } |
| | | |
| | | /** |
| | |
| | | if ((!deleteOperation.isSynchronizationOperation()) |
| | | && (!brokerIsConnected(deleteOperation))) |
| | | { |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString()); |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | |
| | | /* |
| | | * This is a replication operation |
| | | * Check that the modified entry has the same entryuuid |
| | | * has was in the original message. |
| | | * as it was in the original message. |
| | | */ |
| | | String operationEntryUUID = ctx.getEntryUid(); |
| | | String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry); |
| | |
| | | if ((!addOperation.isSynchronizationOperation()) |
| | | && (!brokerIsConnected(addOperation))) |
| | | { |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString()); |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | |
| | | if ((!modifyDNOperation.isSynchronizationOperation()) |
| | | && (!brokerIsConnected(modifyDNOperation))) |
| | | { |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString()); |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | |
| | | * parent is the same as when the operation was performed. |
| | | */ |
| | | String newParentId = findEntryId(modifyDNOperation.getNewSuperior()); |
| | | if ((newParentId != null) && |
| | | if ((newParentId != null) && (ctx.getNewParentId() != null) && |
| | | (!newParentId.equals(ctx.getNewParentId()))) |
| | | { |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | |
| | | if ((!modifyOperation.isSynchronizationOperation()) |
| | | && (!brokerIsConnected(modifyOperation))) |
| | | { |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString()); |
| | | Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); |
| | | return new SynchronizationProviderResult.StopProcessing( |
| | | ResultCode.UNWILLING_TO_PERFORM, msg); |
| | | } |
| | |
| | | findEntryId(addOperation.getEntryDN().getParentDNInSuffix())); |
| | | |
| | | addOperation.setAttachment(SYNCHROCONTEXT, ctx); |
| | | Historical.generateState(addOperation); |
| | | } |
| | | |
| | | /** |
| | |
| | | * also responsible for updating the list of pending changes |
| | | * @return the received message - null if none |
| | | */ |
| | | public UpdateMessage receive() |
| | | public UpdateMsg receive() |
| | | { |
| | | UpdateMessage update = null; |
| | | UpdateMsg update = null; |
| | | |
| | | while (update == null) |
| | | while ( (update == null) && (!shutdown) ) |
| | | { |
| | | InitializeRequestMessage initMsg = null; |
| | | ReplicationMessage msg; |
| | | InitializeRequestMsg initMsg = null; |
| | | ReplicationMsg msg; |
| | | try |
| | | { |
| | | msg = broker.receive(); |
| | |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | if (!(msg instanceof HeartbeatMessage)) |
| | | if (!(msg instanceof HeartbeatMsg)) |
| | | TRACER.debugVerbose("Message received <" + msg + ">"); |
| | | |
| | | if (msg instanceof AckMessage) |
| | | if (msg instanceof AckMsg) |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | | AckMsg ack = (AckMsg) msg; |
| | | receiveAck(ack); |
| | | } |
| | | else if (msg instanceof InitializeRequestMessage) |
| | | else if (msg instanceof InitializeRequestMsg) |
| | | { |
| | | // Another server requests us to provide entries |
| | | // for a total update |
| | | initMsg = (InitializeRequestMessage)msg; |
| | | initMsg = (InitializeRequestMsg)msg; |
| | | } |
| | | else if (msg instanceof InitializeTargetMessage) |
| | | else if (msg instanceof InitializeTargetMsg) |
| | | { |
| | | // Another server is exporting its entries to us |
| | | InitializeTargetMessage importMsg = (InitializeTargetMessage) msg; |
| | | InitializeTargetMsg importMsg = (InitializeTargetMsg) msg; |
| | | |
| | | try |
| | | { |
| | |
| | | catch(DirectoryException de) |
| | | { |
| | | // Returns an error message to notify the sender |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(importMsg.getsenderID(), |
| | | ErrorMsg errorMsg = |
| | | new ErrorMsg(importMsg.getsenderID(), |
| | | de.getMessageObject()); |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(de.getMessageObject()); |
| | |
| | | broker.publish(errorMsg); |
| | | } |
| | | } |
| | | else if (msg instanceof ErrorMessage) |
| | | else if (msg instanceof ErrorMsg) |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMessage)msg); |
| | | abandonImportExport((ErrorMsg)msg); |
| | | } |
| | | else |
| | | { |
| | | /* We can receive an error message from the replication server |
| | | * in the following cases : |
| | | * - we connected with an incorrect generation id |
| | | /* |
| | | * Log error message |
| | | */ |
| | | ErrorMessage errorMsg = (ErrorMessage)msg; |
| | | ErrorMsg errorMsg = (ErrorMsg)msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get( |
| | | errorMsg.getDetails())); |
| | | } |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | if (msg instanceof TopologyMsg) |
| | | { |
| | | update = (UpdateMessage) msg; |
| | | TopologyMsg topoMsg = (TopologyMsg)msg; |
| | | receiveTopo(topoMsg); |
| | | } |
| | | if (msg instanceof ChangeStatusMsg) |
| | | { |
| | | ChangeStatusMsg csMsg = (ChangeStatusMsg)msg; |
| | | receiveChangeStatus(csMsg); |
| | | } |
| | | else if (msg instanceof UpdateMsg) |
| | | { |
| | | update = (UpdateMsg) msg; |
| | | receiveUpdate(update); |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Do the necessary processing when an UpdateMessage was received. |
| | | * Processes an incoming TopologyMsg. |
| | | * Updates the structures for the local view of the topology. |
| | | * |
| | | * @param update The received UpdateMessage. |
| | | * @param topoMsg The topology information received from RS. |
| | | */ |
| | | public void receiveUpdate(UpdateMessage update) |
| | | public void receiveTopo(TopologyMsg topoMsg) |
| | | { |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn |
| | | + " received topology info update:\n" + topoMsg); |
| | | |
| | | // Store new lists |
| | | synchronized(getDsList()) |
| | | { |
| | | synchronized(getRsList()) |
| | | { |
| | | dsList = topoMsg.getDsList(); |
| | | rsList = topoMsg.getRsList(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Set the initial status of the domain, once he is connected to the topology. |
| | | * @param initStatus The status to enter the state machine with |
| | | */ |
| | | public void setInitialStatus(ServerStatus initStatus) |
| | | { |
| | | // Sanity check: is it a valid initial status? |
| | | if (!isValidInitialStatus(initStatus)) |
| | | { |
| | | Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(), |
| | | baseDn.toString(), Short.toString(serverId)); |
| | | logError(msg); |
| | | } else |
| | | { |
| | | status = initStatus; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Processes an incoming ChangeStatusMsg. Compute new status according to |
| | | * given order. Then update domain for being compliant with new status |
| | | * definition. |
| | | * @param csMsg The received status message |
| | | */ |
| | | private void receiveChangeStatus(ChangeStatusMsg csMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn + |
| | | " received change status message:\n" + csMsg); |
| | | |
| | | ServerStatus reqStatus = csMsg.getRequestedStatus(); |
| | | |
| | | // Translate requested status to a state machine event |
| | | StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus); |
| | | if (event == StatusMachineEvent.INVALID_EVENT) |
| | | { |
| | | Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(), |
| | | baseDn.toString(), Short.toString(serverId)); |
| | | logError(msg); |
| | | return; |
| | | } |
| | | |
| | | // Compute new status and do matching tasks |
| | | // Use synchronized as admin task (thread) could order to go in admin status |
| | | // for instance (concurrent with receive thread). |
| | | synchronized (status) |
| | | { |
| | | ServerStatus newStatus = |
| | | StatusMachine.computeNewStatus(status, event); |
| | | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return; |
| | | } |
| | | |
| | | // Store new status |
| | | status = newStatus; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn + |
| | | " new status is: " + status); |
| | | |
| | | // Perform whatever actions are needed to apply properties for being |
| | | // compliant with new status |
| | | updateDomainForNewStatus(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Called when first connection or disconnection detected. |
| | | */ |
| | | public void toNotConnectedStatus() |
| | | { |
| | | // Go into not connected status |
| | | // Use synchronized as somebody could ask another status change at the same |
| | | // time |
| | | synchronized (status) |
| | | { |
| | | StatusMachineEvent event = |
| | | StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT; |
| | | ServerStatus newStatus = |
| | | StatusMachine.computeNewStatus(status, event); |
| | | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return; |
| | | } |
| | | |
| | | // Store new status |
| | | status = newStatus; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn + |
| | | " new status is: " + status); |
| | | |
| | | // Perform whatever actions are needed to apply properties for being |
| | | // compliant with new status |
| | | updateDomainForNewStatus(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Perform whatever actions are needed to apply properties for being |
| | | * compliant with new status. Must be called in synchronized section for |
| | | * status. The new status is already set in status variable. |
| | | */ |
| | | private void updateDomainForNewStatus() |
| | | { |
| | | switch (status) |
| | | { |
| | | case NOT_CONNECTED_STATUS: |
| | | break; |
| | | case NORMAL_STATUS: |
| | | break; |
| | | case DEGRADED_STATUS: |
| | | break; |
| | | case FULL_UPDATE_STATUS: |
| | | // Signal RS we just entered the full update status |
| | | broker.signalStatusChange(status); |
| | | break; |
| | | case BAD_GEN_ID_STATUS: |
| | | break; |
| | | default: |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("updateDomainForNewStatus: unexpected status: " + |
| | | status); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Do the necessary processing when an UpdateMsg was received. |
| | | * |
| | | * @param update The received UpdateMsg. |
| | | */ |
| | | public void receiveUpdate(UpdateMsg update) |
| | | { |
| | | remotePendingChanges.putRemoteUpdate(update); |
| | | numRcvdUpdates.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Do the necessary processing when an AckMessage is received. |
| | | * Do the necessary processing when an AckMsg is received. |
| | | * |
| | | * @param ack The AckMessage that was received. |
| | | * @param ack The AckMsg that was received. |
| | | */ |
| | | public void receiveAck(AckMessage ack) |
| | | public void receiveAck(AckMsg ack) |
| | | { |
| | | UpdateMessage update; |
| | | UpdateMsg update; |
| | | ChangeNumber changeNumber = ack.getChangeNumber(); |
| | | |
| | | synchronized (waitingAckMsgs) |
| | |
| | | { |
| | | numReplayedPostOpCalled++; |
| | | } |
| | | UpdateMessage msg = null; |
| | | UpdateMsg msg = null; |
| | | |
| | | // Note that a failed non-replication operation might not have a change |
| | | // number. |
| | |
| | | { |
| | | // Generate a replication message for a successful non-replication |
| | | // operation. |
| | | msg = UpdateMessage.generateMsg(op, isAssured); |
| | | msg = UpdateMsg.generateMsg(op); |
| | | |
| | | if (msg == null) |
| | | { |
| | |
| | | */ |
| | | public void ack(ChangeNumber changeNumber) |
| | | { |
| | | broker.publish(new AckMessage(changeNumber)); |
| | | broker.publish(new AckMsg(changeNumber)); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Create and replay a synchronized Operation from an UpdateMessage. |
| | | * Create and replay a synchronized Operation from an UpdateMsg. |
| | | * |
| | | * @param msg The UpdateMessage to be replayed. |
| | | * @param msg The UpdateMsg to be replayed. |
| | | */ |
| | | public void replay(UpdateMessage msg) |
| | | public void replay(UpdateMsg msg) |
| | | { |
| | | Operation op = null; |
| | | boolean done = false; |
| | | boolean dependency = false; |
| | | ChangeNumber changeNumber = null; |
| | | int retryCount = 10; |
| | | boolean firstTry = true; |
| | | |
| | | // Try replay the operation, then flush (replaying) any pending operation |
| | | // whose dependency has been replayed until no more left. |
| | |
| | | { |
| | | try |
| | | { |
| | | op = msg.createOperation(conn); |
| | | dependency = remotePendingChanges.checkDependencies(op, msg); |
| | | |
| | | while ((!dependency) && (!done) && (retryCount-- > 0)) |
| | | { |
| | | op = msg.createOperation(conn); |
| | | |
| | | op.setInternalOperation(true); |
| | | op.setSynchronizationOperation(true); |
| | | changeNumber = OperationContext.getChangeNumber(op); |
| | |
| | | { |
| | | ModifyOperation newOp = (ModifyOperation) op; |
| | | dependency = remotePendingChanges.checkDependencies(newOp); |
| | | if ((!dependency) && (!firstTry)) |
| | | { |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | ModifyMsg modifyMsg = (ModifyMsg) msg; |
| | | done = solveNamingConflict(newOp, modifyMsg); |
| | | } else if (op instanceof DeleteOperation) |
| | | { |
| | | DeleteOperation newOp = (DeleteOperation) op; |
| | | dependency = remotePendingChanges.checkDependencies(newOp); |
| | | if ((!dependency) && (!firstTry)) |
| | | { |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | done = solveNamingConflict(newOp, msg); |
| | | } else if (op instanceof AddOperation) |
| | | { |
| | | AddOperation newOp = (AddOperation) op; |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | dependency = remotePendingChanges.checkDependencies(newOp); |
| | | if ((!dependency) && (!firstTry)) |
| | | { |
| | | done = solveNamingConflict(newOp, addMsg); |
| | | } |
| | | done = solveNamingConflict(newOp, addMsg); |
| | | } else if (op instanceof ModifyDNOperationBasis) |
| | | { |
| | | ModifyDNMsg newMsg = (ModifyDNMsg) msg; |
| | | dependency = remotePendingChanges.checkDependencies(newMsg); |
| | | if ((!dependency) && (!firstTry)) |
| | | { |
| | | ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | } |
| | | ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op; |
| | | done = solveNamingConflict(newOp, msg); |
| | | } else |
| | | { |
| | | done = true; // unknown type of operation ?! |
| | |
| | | // however we still need to push this change to the serverState |
| | | updateError(changeNumber); |
| | | } |
| | | } else |
| | | else |
| | | { |
| | | /* |
| | | * Create a new operation as the ConflictResolution |
| | | * different operation. |
| | | */ |
| | | op = msg.createOperation(conn); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | done = true; |
| | | } |
| | | firstTry = false; |
| | | } |
| | | |
| | | if (!done && !dependency) |
| | |
| | | dependency = false; |
| | | changeNumber = null; |
| | | retryCount = 10; |
| | | firstTry = true; |
| | | |
| | | } while (msg != null); |
| | | } |
| | |
| | | * |
| | | * @param dn The dn of the entry for which the unique Id is searched. |
| | | * |
| | | * @return The unique Id of the entry whith the provided DN. |
| | | * @return The unique Id of the entry with the provided DN. |
| | | */ |
| | | private String findEntryId(DN dn) |
| | | static String findEntryId(DN dn) |
| | | { |
| | | if (dn == null) |
| | | return null; |
| | | try |
| | | { |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(ENTRYUIDNAME); |
| | | InternalSearchOperation search = conn.processSearch(dn, |
| | |
| | | } |
| | | |
| | | /** |
| | | * find the current dn of an entry from its entry uuid. |
| | | * find the current DN of an entry from its entry UUID. |
| | | * |
| | | * @param uuid the Entry Unique ID. |
| | | * @return The curernt dn of the entry or null if there is no entry with |
| | | * the specified uuid. |
| | | * @return The current DN of the entry or null if there is no entry with |
| | | * the specified UUID. |
| | | */ |
| | | private DN findEntryDN(String uuid) |
| | | { |
| | | try |
| | | { |
| | | InternalSearchOperation search = conn.processSearch(baseDN, |
| | | InternalSearchOperation search = conn.processSearch(baseDn, |
| | | SearchScope.WHOLE_SUBTREE, |
| | | SearchFilter.createFilterFromString("entryuuid="+uuid)); |
| | | if (search.getResultCode() == ResultCode.SUCCESS) |
| | |
| | | * @return true if the process is completed, false if it must continue.. |
| | | */ |
| | | private boolean solveNamingConflict(ModifyOperation op, |
| | | UpdateMessage msg) |
| | | ModifyMsg msg) |
| | | { |
| | | ResultCode result = op.getResultCode(); |
| | | ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT); |
| | |
| | | return true; |
| | | } |
| | | } |
| | | else if (result == ResultCode.NOT_ALLOWED_ON_RDN) |
| | | { |
| | | DN currentDN = findEntryDN(entryUid); |
| | | RDN currentRDN = null; |
| | | if (currentDN != null) |
| | | { |
| | | currentRDN = currentDN.getRDN(); |
| | | } |
| | | else |
| | | { |
| | | // The entry does not exist anymore. |
| | | numResolvedNamingConflicts.incrementAndGet(); |
| | | return true; |
| | | } |
| | | |
| | | // The modify operation is trying to delete the value that is |
| | | // currently used in the RDN. We need to alter the modify so that it does |
| | | // not remove the current RDN value(s). |
| | | |
| | | List<Modification> mods = op.getModifications(); |
| | | for (Modification mod : mods) |
| | | { |
| | | AttributeType modAttrType = mod.getAttribute().getAttributeType(); |
| | | if ((mod.getModificationType() == ModificationType.DELETE) || |
| | | (mod.getModificationType() == ModificationType.REPLACE)) |
| | | { |
| | | if (currentRDN.hasAttributeType(modAttrType)) |
| | | { |
| | | // the attribute can't be deleted because it is used |
| | | // in the RDN, turn this operation is a replace with the |
| | | // current RDN value(s); |
| | | mod.setModificationType(ModificationType.REPLACE); |
| | | Attribute newAttribute = mod.getAttribute(); |
| | | AttributeBuilder attrBuilder; |
| | | if (newAttribute == null) |
| | | { |
| | | attrBuilder = new AttributeBuilder(modAttrType); |
| | | } |
| | | else |
| | | { |
| | | attrBuilder = new AttributeBuilder(newAttribute); |
| | | } |
| | | attrBuilder.add(currentRDN.getAttributeValue(modAttrType)); |
| | | mod.setAttribute(attrBuilder.toAttribute()); |
| | | } |
| | | } |
| | | } |
| | | msg.setMods(mods); |
| | | numResolvedNamingConflicts.incrementAndGet(); |
| | | return false; |
| | | } |
| | | else |
| | | { |
| | | // The other type of errors can not be caused by naming conflicts. |
| | |
| | | * @return true if the process is completed, false if it must continue.. |
| | | */ |
| | | private boolean solveNamingConflict(DeleteOperation op, |
| | | UpdateMessage msg) |
| | | UpdateMsg msg) |
| | | { |
| | | ResultCode result = op.getResultCode(); |
| | | DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT); |
| | |
| | | * @throws Exception When the operation is not valid. |
| | | */ |
| | | private boolean solveNamingConflict(ModifyDNOperation op, |
| | | UpdateMessage msg) throws Exception |
| | | UpdateMsg msg) throws Exception |
| | | { |
| | | ResultCode result = op.getResultCode(); |
| | | ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT); |
| | |
| | | * - don't do anything if the operation is replayed. |
| | | */ |
| | | |
| | | // Construct the new DN to use for the entry. |
| | | DN entryDN = op.getEntryDN(); |
| | | DN newSuperior = findEntryDN(newSuperiorID); |
| | | RDN newRDN = op.getNewRDN(); |
| | | DN parentDN; |
| | | |
| | | if (newSuperior == null) |
| | | { |
| | | parentDN = entryDN.getParent(); |
| | | } |
| | | else |
| | | { |
| | | parentDN = newSuperior; |
| | | } |
| | | |
| | | if ((parentDN == null) || parentDN.isNullDN()) |
| | | { |
| | | /* this should never happen |
| | | * can't solve any conflict in this case. |
| | | */ |
| | | throw new Exception("operation parameters are invalid"); |
| | | } |
| | | |
| | | DN newDN = parentDN.concat(newRDN); |
| | | |
| | | // get the current DN of this entry in the database. |
| | | DN currentDN = findEntryDN(entryUid); |
| | | |
| | | // Construct the new DN to use for the entry. |
| | | DN entryDN = op.getEntryDN(); |
| | | DN newSuperior = null; |
| | | RDN newRDN = op.getNewRDN(); |
| | | |
| | | if (newSuperiorID != null) |
| | | { |
| | | newSuperior = findEntryDN(newSuperiorID); |
| | | } |
| | | else |
| | | { |
| | | newSuperior = entryDN.getParent(); |
| | | } |
| | | |
| | | //If we could not find the new parent entry, we missed this entry |
| | | // earlier or it has disappeared from the database |
| | | // Log this information for the repair tool and mark the entry |
| | | // as conflicting. |
| | | // stop the processing. |
| | | if (newSuperior == null) |
| | | { |
| | | markConflictEntry(op, currentDN, currentDN.getParent().concat(newRDN)); |
| | | numUnresolvedNamingConflicts.incrementAndGet(); |
| | | return true; |
| | | } |
| | | |
| | | DN newDN = newSuperior.concat(newRDN); |
| | | |
| | | if (currentDN == null) |
| | | { |
| | | // The entry targetted by the Modify DN is not in the database |
| | | // The entry targeted by the Modify DN is not in the database |
| | | // anymore. |
| | | // This is a conflict between a delete and this modify DN. |
| | | // The entry has been deleted anymore so we can safely assume |
| | | // The entry has been deleted, we can safely assume |
| | | // that the operation is completed. |
| | | numResolvedNamingConflicts.incrementAndGet(); |
| | | return true; |
| | |
| | | return true; |
| | | } |
| | | |
| | | // If we could not find the new parent entry, we missed this entry |
| | | // earlier or it has disappeared from the database |
| | | // Log this information for the repair tool and mark the entry |
| | | // as conflicting. |
| | | // stop the processing. |
| | | if (newSuperior == null) |
| | | { |
| | | markConflictEntry(op, currentDN, newDN); |
| | | numUnresolvedNamingConflicts.incrementAndGet(); |
| | | return true; |
| | | } |
| | | |
| | | if ((result == ResultCode.NO_SUCH_OBJECT) || |
| | | (result == ResultCode.UNWILLING_TO_PERFORM) || |
| | | (result == ResultCode.OBJECTCLASS_VIOLATION)) |
| | | { |
| | | /* |
| | |
| | | |
| | | msg.setDn(generateConflictRDN(entryUid, |
| | | op.getEntryDN().getRDN().toString()) + "," |
| | | + baseDN); |
| | | + baseDn); |
| | | // reset the parent uid so that the check done is the handleConflict |
| | | // phase does not fail. |
| | | msg.setParentUid(null); |
| | |
| | | } |
| | | } catch (DirectoryException e) |
| | | { |
| | | // log errror and information for the REPAIR tool. |
| | | // log error and information for the REPAIR tool. |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_EXCEPTION_RENAME_CONFLICT_ENTRY.get()); |
| | | mb.append(String.valueOf(entryDN)); |
| | |
| | | |
| | | /** |
| | | * Rename an entry that was conflicting so that it stays below the |
| | | * baseDN of the replicationDomain. |
| | | * baseDn of the replicationDomain. |
| | | * |
| | | * @param conflictOp The Operation that caused the conflict. |
| | | * @param dn The DN of the entry to be renamed. |
| | |
| | | InternalClientConnection.getRootConnection(); |
| | | |
| | | ModifyDNOperation newOp = conn.processModifyDN( |
| | | dn, generateDeleteConflictDn(uid, dn),false, baseDN); |
| | | dn, generateDeleteConflictDn(uid, dn),false, baseDn); |
| | | |
| | | if (newOp.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | |
| | | AttributeType attrType = |
| | | DirectoryServer.getAttributeType(DS_SYNC_CONFLICT, true); |
| | | LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); |
| | | values.add(new AttributeValue(attrType, conflictDN.toString())); |
| | | Attribute attr = new Attribute(attrType, DS_SYNC_CONFLICT, values); |
| | | AttributeType attrType = DirectoryServer.getAttributeType(DS_SYNC_CONFLICT, |
| | | true); |
| | | Attribute attr = Attributes.create(attrType, new AttributeValue( |
| | | attrType, conflictDN.toString())); |
| | | List<Modification> mods = new ArrayList<Modification>(); |
| | | Modification mod = new Modification(ModificationType.REPLACE, attr); |
| | | mods.add(mod); |
| | |
| | | logError(mb.toMessage()); |
| | | } |
| | | |
| | | // Generate an alert to let the administratot know that some |
| | | // Generate an alert to let the administration know that some |
| | | // conflict could not be solved. |
| | | Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN.toString()); |
| | | DirectoryServer.sendAlertNotification(this, |
| | |
| | | * |
| | | * @param msg The conflicting Add Operation. |
| | | * |
| | | * @throws ASN1Exception When an encoding error happenned manipulating the |
| | | * @throws ASN1Exception When an encoding error happened manipulating the |
| | | * msg. |
| | | */ |
| | | private void addConflict(AddMsg msg) throws ASN1Exception |
| | | { |
| | | // Generate an alert to let the administratot know that some |
| | | // Generate an alert to let the administrator know that some |
| | | // conflict could not be solved. |
| | | Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(msg.getDn()); |
| | | DirectoryServer.sendAlertNotification(this, |
| | |
| | | * Get the server ID. |
| | | * @return The server ID. |
| | | */ |
| | | public int getServerId() |
| | | public short getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | |
| | | /** |
| | | * Enable back the domain after a previous disable. |
| | | * The domain will connect back to a replication Server and |
| | | * will recreate threads to listen for messages from the Sycnhronization |
| | | * will recreate threads to listen for messages from the Synchronization |
| | | * server. |
| | | * The generationId will be retrieved or computed if necessary. |
| | | * The ServerState will also be read again from the local database. |
| | |
| | | * should we stop the modifications ? |
| | | */ |
| | | logError(ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toNormalizedString(), e.getLocalizedMessage())); |
| | | baseDn.toNormalizedString(), e.getLocalizedMessage())); |
| | | return; |
| | | } |
| | | |
| | |
| | | public ResultCode saveGenerationId(long generationId) |
| | | { |
| | | // The generationId is stored in the root entry of the domain. |
| | | ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString()); |
| | | ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDn.toString()); |
| | | |
| | | ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>(); |
| | | ASN1OctetString value = new ASN1OctetString(Long.toString(generationId)); |
| | |
| | | Message message = ERR_UPDATING_GENERATION_ID.get( |
| | | op.getResultCode().getResultCodeName() + " " + |
| | | op.getErrorMessage(), |
| | | baseDN.toString()); |
| | | baseDn.toString()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Attempt to read generation ID from DB " + baseDN.toString()); |
| | | "Attempt to read generation ID from DB " + baseDn.toString()); |
| | | |
| | | ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString()); |
| | | ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDn.toString()); |
| | | boolean found = false; |
| | | LDAPFilter filter; |
| | | try |
| | |
| | | Message message = ERR_SEARCHING_GENERATION_ID.get( |
| | | search.getResultCode().getResultCodeName() + " " + |
| | | search.getErrorMessage(), |
| | | baseDN.toString()); |
| | | baseDn.toString()); |
| | | logError(message); |
| | | } |
| | | |
| | |
| | | if (attrs != null) |
| | | { |
| | | Attribute attr = attrs.get(0); |
| | | LinkedHashSet<AttributeValue> values = attr.getValues(); |
| | | if (values.size()>1) |
| | | if (attr.size()>1) |
| | | { |
| | | Message message = ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toString(), "#Values=" + values.size() + |
| | | baseDn.toString(), "#Values=" + attr.size() + |
| | | " Must be exactly 1 in entry " + |
| | | resultEntry.toLDIFString()); |
| | | logError(message); |
| | | } |
| | | else if (values.size() == 1) |
| | | else if (attr.size() == 1) |
| | | { |
| | | found=true; |
| | | try |
| | | { |
| | | generationId = Long.decode(values.iterator().next(). |
| | | generationId = Long.decode(attr.iterator().next(). |
| | | getStringValue()); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | Message message = ERR_LOADING_GENERATION_ID.get( |
| | | baseDN.toString(), e.getLocalizedMessage()); |
| | | baseDn.toString(), e.getLocalizedMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Generation ID created for domain base DN=" + |
| | | baseDN.toString() + |
| | | baseDn.toString() + |
| | | " generationId=" + generationId); |
| | | } |
| | | else |
| | |
| | | generationIdSavedStatus = true; |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "Generation ID successfully read from domain base DN=" + baseDN + |
| | | "Generation ID successfully read from domain base DN=" + baseDn + |
| | | " generationId=" + generationId); |
| | | } |
| | | return generationId; |
| | |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get( |
| | | baseDN.toNormalizedString()); |
| | | baseDn.toNormalizedString()); |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | } |
| | | |
| | | ResetGenerationId genIdMessage = null; |
| | | ResetGenerationIdMsg genIdMessage = null; |
| | | |
| | | if (generationIdNewValue == null) |
| | | { |
| | | genIdMessage = new ResetGenerationId(this.generationId); |
| | | genIdMessage = new ResetGenerationIdMsg(this.generationId); |
| | | } |
| | | else |
| | | { |
| | | genIdMessage = new ResetGenerationId(generationIdNewValue); |
| | | genIdMessage = new ResetGenerationIdMsg(generationIdNewValue); |
| | | } |
| | | broker.publish(genIdMessage); |
| | | } |
| | |
| | | */ |
| | | public byte[] receiveEntryBytes() |
| | | { |
| | | ReplicationMessage msg; |
| | | ReplicationMsg msg; |
| | | while (true) |
| | | { |
| | | try |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugVerbose( |
| | | " sid:" + this.serverId + |
| | | " base DN:" + this.baseDN + |
| | | " base DN:" + this.baseDn + |
| | | " Import EntryBytes received " + msg); |
| | | if (msg == null) |
| | | { |
| | |
| | | return null; |
| | | } |
| | | |
| | | if (msg instanceof EntryMessage) |
| | | if (msg instanceof EntryMsg) |
| | | { |
| | | EntryMessage entryMsg = (EntryMessage)msg; |
| | | EntryMsg entryMsg = (EntryMsg)msg; |
| | | byte[] entryBytes = entryMsg.getEntryBytes(); |
| | | ieContext.updateCounters(); |
| | | return entryBytes; |
| | | } |
| | | else if (msg instanceof DoneMessage) |
| | | else if (msg instanceof DoneMsg) |
| | | { |
| | | // This is the normal termination of the import |
| | | // No error is stored and the import is ended |
| | | // by returning null |
| | | return null; |
| | | } |
| | | else if (msg instanceof ErrorMessage) |
| | | else if (msg instanceof ErrorMsg) |
| | | { |
| | | // This is an error termination during the import |
| | | // The error is stored and the import is ended |
| | | // by returning null |
| | | ErrorMessage errorMsg = (ErrorMessage)msg; |
| | | ErrorMsg errorMsg = (ErrorMsg)msg; |
| | | ieContext.exception = new DirectoryException( |
| | | ResultCode.OTHER, |
| | | errorMsg.getDetails()); |
| | |
| | | * on going. |
| | | * @param errorMsg The error message received. |
| | | */ |
| | | protected void abandonImportExport(ErrorMessage errorMsg) |
| | | protected void abandonImportExport(ErrorMsg errorMsg) |
| | | { |
| | | // FIXME TBD Treat the case where the error happens while entries |
| | | // are being exported |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugVerbose( |
| | | " abandonImportExport:" + this.serverId + |
| | | " base DN:" + this.baseDN + |
| | | " base DN:" + this.baseDn + |
| | | " Error Msg received " + errorMsg); |
| | | |
| | | if (ieContext != null) |
| | |
| | | throws DirectoryException |
| | | { |
| | | long genID = 0; |
| | | Backend backend = retrievesBackend(this.baseDN); |
| | | long bec = backend.numSubordinates(baseDN, true) + 1; |
| | | Backend backend = retrievesBackend(this.baseDn); |
| | | long bec = backend.numSubordinates(baseDn, true) + 1; |
| | | long entryCount = (bec<1000?bec:1000); |
| | | |
| | | // Acquire a shared lock for the backend. |
| | |
| | | if (checksumOutput) |
| | | { |
| | | ros = new ReplLDIFOutputStream(this, entryCount); |
| | | os = new CheckedOutputStream(ros, new Adler32()); |
| | | os = new CheckedOutputStream(ros, new GenerationIdChecksum()); |
| | | try |
| | | { |
| | | os.write((Long.toString(backend.numSubordinates(baseDN, true) + 1)). |
| | | os.write((Long.toString(backend.numSubordinates(baseDn, true) + 1)). |
| | | getBytes()); |
| | | } |
| | | catch(Exception e) |
| | |
| | | } |
| | | LDIFExportConfig exportConfig = new LDIFExportConfig(os); |
| | | |
| | | // baseDN branch is the only one included in the export |
| | | // baseDn branch is the only one included in the export |
| | | List<DN> includeBranches = new ArrayList<DN>(1); |
| | | includeBranches.add(this.baseDN); |
| | | includeBranches.add(this.baseDn); |
| | | exportConfig.setIncludeBranches(includeBranches); |
| | | |
| | | // For the checksum computing mode, only consider the 'stable' attributes |
| | |
| | | } |
| | | finally |
| | | { |
| | | // Clean up after the export by closing the export config. |
| | | // Will also flush the export and export the remaining entries. |
| | | exportConfig.close(); |
| | | |
| | | if (checksumOutput) |
| | | { |
| | | genID = |
| | | ((CheckedOutputStream)os).getChecksum().getValue(); |
| | | } |
| | | else |
| | | { |
| | | // Clean up after the export by closing the export config. |
| | | // Will also flush the export and export the remaining entries. |
| | | // This is a real export where writer has been initialized. |
| | | exportConfig.close(); |
| | | } |
| | | |
| | | // Release the shared lock on the backend. |
| | | try |
| | |
| | | * Retrieves the backend related to the domain. |
| | | * |
| | | * @return The backend of that domain. |
| | | * @param baseDN The baseDN to retrieve the backend |
| | | * @param baseDn The baseDn to retrieve the backend |
| | | */ |
| | | protected static Backend retrievesBackend(DN baseDN) |
| | | protected static Backend retrievesBackend(DN baseDn) |
| | | { |
| | | // Retrieves the backend related to this domain |
| | | return DirectoryServer.getBackend(baseDN); |
| | | return DirectoryServer.getBackend(baseDn); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void exportLDIFEntry(String lDIFEntry) throws IOException |
| | | { |
| | | // If an error was raised - like receiving an ErrorMessage |
| | | // If an error was raised - like receiving an ErrorMsg |
| | | // we just let down the export. |
| | | if (ieContext.exception != null) |
| | | { |
| | |
| | | throw ioe; |
| | | } |
| | | |
| | | EntryMessage entryMessage = new EntryMessage( |
| | | EntryMsg entryMessage = new EntryMsg( |
| | | serverId, ieContext.exportTarget, lDIFEntry.getBytes()); |
| | | broker.publish(entryMessage); |
| | | |
| | |
| | | acquireIEContext(); |
| | | ieContext.initializeTask = initTask; |
| | | |
| | | InitializeRequestMessage initializeMsg = new InitializeRequestMessage( |
| | | baseDN, serverId, source); |
| | | InitializeRequestMsg initializeMsg = new InitializeRequestMsg( |
| | | baseDn, serverId, source); |
| | | |
| | | // Publish Init request msg |
| | | broker.publish(initializeMsg); |
| | |
| | | Throwable cause; |
| | | if (targetString.equalsIgnoreCase("all")) |
| | | { |
| | | return RoutableMessage.ALL_SERVERS; |
| | | return RoutableMsg.ALL_SERVERS; |
| | | } |
| | | |
| | | // So should be a serverID |
| | |
| | | public void initializeRemote(short target, short requestorID, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get( |
| | | Short.toString(serverId), |
| | | baseDn.toString(), |
| | | Short.toString(requestorID)); |
| | | logError(msg); |
| | | |
| | | boolean contextAcquired=false; |
| | | |
| | | try |
| | | { |
| | | Backend backend = retrievesBackend(this.baseDN); |
| | | Backend backend = retrievesBackend(this.baseDn); |
| | | |
| | | if (!backend.supportsLDIFExport()) |
| | | { |
| | |
| | | |
| | | // The number of entries to be exported is the number of entries under |
| | | // the base DN entry and the base entry itself. |
| | | long entryCount = backend.numSubordinates(baseDN, true) + 1; |
| | | long entryCount = backend.numSubordinates(baseDn, true) + 1; |
| | | ieContext.exportTarget = target; |
| | | if (initTask != null) |
| | | { |
| | |
| | | ieContext.setCounters(entryCount, entryCount); |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMessage initializeMessage = new InitializeTargetMessage( |
| | | baseDN, serverId, ieContext.exportTarget, requestorID, entryCount); |
| | | InitializeTargetMsg initializeMessage = new InitializeTargetMsg( |
| | | baseDn, serverId, ieContext.exportTarget, requestorID, entryCount); |
| | | |
| | | broker.publish(initializeMessage); |
| | | |
| | | exportBackend(false); |
| | | |
| | | // Notify the peer of the success |
| | | DoneMessage doneMsg = new DoneMessage(serverId, |
| | | DoneMsg doneMsg = new DoneMsg(serverId, |
| | | initializeMessage.getDestination()); |
| | | broker.publish(doneMsg); |
| | | |
| | |
| | | catch(DirectoryException de) |
| | | { |
| | | // Notify the peer of the failure |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(target, |
| | | ErrorMsg errorMsg = |
| | | new ErrorMsg(target, |
| | | de.getMessageObject()); |
| | | broker.publish(errorMsg); |
| | | |
| | |
| | | |
| | | throw(de); |
| | | } |
| | | |
| | | msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get( |
| | | Short.toString(serverId), |
| | | baseDn.toString(), |
| | | Short.toString(requestorID)); |
| | | logError(msg); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param initializeMessage The message that initiated the import. |
| | | * @exception DirectoryException Thrown when an error occurs. |
| | | */ |
| | | protected void initialize(InitializeTargetMessage initializeMessage) |
| | | protected void initialize(InitializeTargetMsg initializeMessage) |
| | | throws DirectoryException |
| | | { |
| | | LDIFImportConfig importConfig = null; |
| | | DirectoryException de = null; |
| | | |
| | | Backend backend = retrievesBackend(baseDN); |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get( |
| | | Short.toString(serverId), |
| | | baseDn.toString(), |
| | | Long.toString(initializeMessage.getRequestorID())); |
| | | logError(msg); |
| | | |
| | | // Go into full update status |
| | | // Use synchronized as somebody could ask another status change at the same |
| | | // time |
| | | synchronized (status) |
| | | { |
| | | StatusMachineEvent event = StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT; |
| | | ServerStatus newStatus = |
| | | StatusMachine.computeNewStatus(status, event); |
| | | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return; |
| | | } |
| | | |
| | | // Store new status |
| | | status = newStatus; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn + |
| | | " new status is: " + status); |
| | | |
| | | // Perform whatever actions are needed to apply properties for being |
| | | // compliant with new status |
| | | updateDomainForNewStatus(); |
| | | } |
| | | |
| | | Backend backend = retrievesBackend(baseDn); |
| | | |
| | | try |
| | | { |
| | |
| | | importConfig = |
| | | new LDIFImportConfig(ieContext.ldifImportInputStream); |
| | | List<DN> includeBranches = new ArrayList<DN>(); |
| | | includeBranches.add(this.baseDN); |
| | | includeBranches.add(this.baseDn); |
| | | importConfig.setIncludeBranches(includeBranches); |
| | | importConfig.setAppendToExistingData(false); |
| | | |
| | |
| | | // Re-enable backend |
| | | closeBackendImport(backend); |
| | | |
| | | backend = retrievesBackend(baseDN); |
| | | backend = retrievesBackend(baseDn); |
| | | } |
| | | |
| | | try |
| | |
| | | { |
| | | throw de; |
| | | } |
| | | |
| | | msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get( |
| | | Short.toString(serverId), |
| | | baseDn.toString(), |
| | | Long.toString(initializeMessage.getRequestorID())); |
| | | logError(msg); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Retrieves a replication domain based on the baseDN. |
| | | * Retrieves a replication domain based on the baseDn. |
| | | * |
| | | * @param baseDN The baseDN of the domain to retrieve |
| | | * @param baseDn The baseDn of the domain to retrieve |
| | | * @return The domain retrieved |
| | | * @throws DirectoryException When an error occurred or no domain |
| | | * match the provided baseDN. |
| | | * match the provided baseDn. |
| | | */ |
| | | public static ReplicationDomain retrievesReplicationDomain(DN baseDN) |
| | | public static ReplicationDomain retrievesReplicationDomain(DN baseDn) |
| | | throws DirectoryException |
| | | { |
| | | ReplicationDomain replicationDomain = null; |
| | | |
| | | // Retrieves the domain |
| | | DirectoryServer.getSynchronizationProviders(); |
| | | for (SynchronizationProvider provider : |
| | | for (SynchronizationProvider<?> provider : |
| | | DirectoryServer.getSynchronizationProviders()) |
| | | { |
| | | if (!( provider instanceof MultimasterReplication)) |
| | |
| | | |
| | | // From the domainDN retrieves the replication domain |
| | | ReplicationDomain sdomain = |
| | | MultimasterReplication.findDomain(baseDN, null); |
| | | MultimasterReplication.findDomain(baseDn, null); |
| | | if (sdomain == null) |
| | | { |
| | | break; |
| | |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(ERR_NO_MATCHING_DOMAIN.get()); |
| | | mb.append(" "); |
| | | mb.append(String.valueOf(baseDN)); |
| | | mb.append(String.valueOf(baseDn)); |
| | | throw new DirectoryException(ResultCode.OTHER, |
| | | mb.toMessage()); |
| | | } |
| | |
| | | */ |
| | | public Backend getBackend() |
| | | { |
| | | return retrievesBackend(baseDN); |
| | | return retrievesBackend(baseDn); |
| | | } |
| | | |
| | | /** |
| | | * Returns a boolean indiciating if an import or export is currently |
| | | * Returns a boolean indicating if an import or export is currently |
| | | * processed. |
| | | * @return The status |
| | | */ |
| | |
| | | |
| | | |
| | | /** |
| | | * Push the modifications contain the in given parameter has |
| | | * Push the modifications contained in the given parameter as |
| | | * a modification that would happen on a local server. |
| | | * The modifications are not applied to the local database, |
| | | * historical information is not updated but a ChangeNumber |
| | |
| | | * @param configuration The configuration to check. |
| | | * @param unacceptableReasons When the configuration is not acceptable, this |
| | | * table is use to return the reasons why this |
| | | * configuration is not acceptbale. |
| | | * configuration is not acceptable. |
| | | * |
| | | * @return true if the configuration is acceptable, false other wise. |
| | | */ |
| | |
| | | // Check that there is not already a domain with the same DN |
| | | DN dn = configuration.getBaseDN(); |
| | | ReplicationDomain domain = MultimasterReplication.findDomain(dn, null); |
| | | if ((domain != null) && (domain.baseDN.equals(dn))) |
| | | if ((domain != null) && (domain.baseDn.equals(dn))) |
| | | { |
| | | Message message = ERR_SYNC_INVALID_DN.get(); |
| | | unacceptableReasons.add(message); |
| | |
| | | else |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Gets the info for DSs in the topology (except us). |
| | | * @return The info for DSs in the topology (except us) |
| | | */ |
| | | public List<DSInfo> getDsList() |
| | | { |
| | | return dsList; |
| | | } |
| | | |
| | | /** |
| | | * Gets the info for RSs in the topology (except the one we are connected |
| | | * to). |
| | | * @return The info for RSs in the topology (except the one we are connected |
| | | * to) |
| | | */ |
| | | public List<RSInfo> getRsList() |
| | | { |
| | | return rsList; |
| | | } |
| | | |
| | | /** |
| | | * Tells if assured replication is enabled for this domain. |
| | | * @return True if assured replication is enabled for this domain. |
| | | */ |
| | | public boolean isAssured() |
| | | { |
| | | return assured; |
| | | } |
| | | |
| | | /** |
| | | * Gives the mode for the assured replication of the domain. |
| | | * @return The mode for the assured replication of the domain. |
| | | */ |
| | | public AssuredMode getAssuredMode() |
| | | { |
| | | return assuredMode; |
| | | } |
| | | |
| | | /** |
| | | * Gives the assured level of the replication of the domain. |
| | | * @return The assured level of the replication of the domain. |
| | | */ |
| | | public byte getAssuredSdLevel() |
| | | { |
| | | return assuredSdLevel; |
| | | } |
| | | |
| | | /** |
| | | * Gets the group id for this domain. |
| | | * @return The group id for this domain. |
| | | */ |
| | | public byte getGroupId() |
| | | { |
| | | return groupId; |
| | | } |
| | | |
| | | /** |
| | | * Gets the referrals URLs this domain publishes. |
| | | * @return The referrals URLs this domain publishes. |
| | | */ |
| | | public List<String> getRefUrls() |
| | | { |
| | | return refUrls; |
| | | } |
| | | |
| | | /** |
| | | * Gets the status for this domain. |
| | | * @return The status for this domain. |
| | | */ |
| | | public ServerStatus getStatus() |
| | | { |
| | | return status; |
| | | } |
| | | } |