| | |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.service.ReplicationBroker; |
| | | import org.opends.server.replication.service.ReplicationDomain; |
| | | import org.opends.server.replication.service.ReplicationMonitor; |
| | | import org.opends.server.tasks.TaskUtils; |
| | |
| | | */ |
| | | public class LDAPReplicationDomain extends ReplicationDomain |
| | | implements ConfigurationChangeListener<ReplicationDomainCfg>, |
| | | AlertGenerator, InternalSearchListener |
| | | AlertGenerator |
| | | { |
| | | /** |
| | | * This class is used in the session establishment phase |
| | | * when no Replication Server with all the local changes has been found |
| | | * and we therefore need to recover them. |
| | | * A search is then performed on the database using this |
| | | * internalSearchListener. |
| | | */ |
| | | private class ScanSearchListener implements InternalSearchListener |
| | | { |
| | | private ChangeNumber startingChangeNumber = null; |
| | | private ChangeNumber endChangeNumber = null; |
| | | |
| | | public ScanSearchListener( |
| | | ChangeNumber startingChangeNumber, |
| | | ChangeNumber endChangeNumber) |
| | | { |
| | | this.startingChangeNumber = startingChangeNumber; |
| | | this.endChangeNumber = endChangeNumber; |
| | | } |
| | | |
| | | @Override |
| | | public void handleInternalSearchEntry( |
| | | InternalSearchOperation searchOperation, SearchResultEntry searchEntry) |
| | | throws DirectoryException |
| | | { |
| | | // Build the list of Operations that happened on this entry |
| | | // after startingChangeNumber and before endChangeNumber and |
| | | // add them to the replayOperations list |
| | | Iterable<FakeOperation> updates = |
| | | Historical.generateFakeOperations(searchEntry); |
| | | |
| | | for (FakeOperation op : updates) |
| | | { |
| | | ChangeNumber cn = op.getChangeNumber(); |
| | | if ((cn.newer(startingChangeNumber)) && (cn.older(endChangeNumber))) |
| | | { |
| | | synchronized (replayOperations) |
| | | { |
| | | replayOperations.put(cn, op); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void handleInternalSearchReference( |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultReference searchReference) throws DirectoryException |
| | | { |
| | | // Nothing to do. |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * The fully-qualified name of this class. |
| | | */ |
| | | private static final String CLASS_NAME = |
| | |
| | | } |
| | | |
| | | /** |
| | | * The thread that is responsible to update the RS to which this domain is |
| | | * connected in case it is late and there is no RS which is up to date. |
| | | */ |
| | | private class RSUpdater extends DirectoryThread |
| | | { |
| | | private ChangeNumber startChangeNumber; |
| | | protected RSUpdater(ChangeNumber replServerMaxChangeNumber) |
| | | { |
| | | super("Replication Server Updater for server id " + |
| | | serverId + " and domain " + baseDn.toString()); |
| | | this.startChangeNumber = replServerMaxChangeNumber; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | // Replication server is missing some of our changes: let's |
| | | // send them to him. |
| | | Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); |
| | | logError(message); |
| | | |
| | | /* |
| | | * Get all the changes that have not been seen by this |
| | | * replication server and publish them. |
| | | */ |
| | | try |
| | | { |
| | | if (buildAndPublishMissingChanges(startChangeNumber, broker)) |
| | | { |
| | | message = DEBUG_CHANGES_SENT.get(); |
| | | logError(message); |
| | | synchronized(replayOperations) |
| | | { |
| | | replayOperations.clear(); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | * An error happened trying to search for the updates |
| | | * This server will start accepting again new updates but |
| | | * some inconsistencies will stay between servers. |
| | | * Log an error for the repair tool |
| | | * that will need to re-synchronize the servers. |
| | | */ |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | | /* |
| | | * An error happened trying to search for the updates |
| | | * This server will start accepting again new updates but |
| | | * some inconsistencies will stay between servers. |
| | | * Log an error for the repair tool |
| | | * that will need to re-synchronize the servers. |
| | | */ |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | } |
| | | finally |
| | | { |
| | | broker.setRecoveryRequired(false); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Creates a new ReplicationDomain using configuration from configEntry. |
| | | * |
| | | * @param configuration The configuration of this ReplicationDomain. |
| | |
| | | saveGenerationId(generationId); |
| | | } |
| | | |
| | | startPublishService(replicationServers, window, heartbeatInterval, |
| | | configuration.getChangetimeHeartbeatInterval()); |
| | | |
| | | /* |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | | * for each operation done on this replication domain. |
| | |
| | | pendingChanges = |
| | | new PendingChanges(generator, this); |
| | | |
| | | startPublishService(replicationServers, window, heartbeatInterval, |
| | | configuration.getChangetimeHeartbeatInterval()); |
| | | |
| | | remotePendingChanges = new RemotePendingChanges(getServerState()); |
| | | |
| | | // listen for changes on the configuration |
| | |
| | | if ((ourMaxChangeNumber != null) && |
| | | (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | { |
| | | |
| | | // Replication server is missing some of our changes: let's |
| | | // send them to him. |
| | | |
| | | Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); |
| | | logError(message); |
| | | |
| | | /* |
| | | * Get all the changes that have not been seen by this |
| | | * replication server and populate the replayOperations |
| | | * list. |
| | | */ |
| | | InternalSearchOperation op = searchForChangedEntries( |
| | | baseDn, replServerMaxChangeNumber, this); |
| | | if (op.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | | /* |
| | | * An error happened trying to search for the updates |
| | | * This server will start accepting again new updates but |
| | | * some inconsistencies will stay between servers. |
| | | * Log an error for the repair tool |
| | | * that will need to re-synchronize the servers. |
| | | */ |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | } else |
| | | { |
| | | for (FakeOperation replayOp : |
| | | replayOperations.tailMap(replServerMaxChangeNumber).values()) |
| | | { |
| | | ChangeNumber cn = replayOp.getChangeNumber(); |
| | | /* |
| | | * Because the entry returned by the search operation |
| | | * can contain old historical information, it is |
| | | * possible that some of the FakeOperation are |
| | | * actually older than the last ChangeNumber known by |
| | | * the Replication Server. |
| | | * In such case don't send the operation. |
| | | */ |
| | | if (!cn.newer(replServerMaxChangeNumber)) |
| | | { |
| | | continue; |
| | | } |
| | | |
| | | /* |
| | | * Check if the DeleteOperation has been abandoned before |
| | | * being processed. This is necessary because the replayOperation |
| | | * |
| | | */ |
| | | if (replayOp instanceof FakeDelOperation) |
| | | { |
| | | FakeDelOperation delOp = (FakeDelOperation) replayOp; |
| | | if (findEntryDN(delOp.getUUID()) != null) |
| | | { |
| | | continue; |
| | | } |
| | | } |
| | | message = |
| | | DEBUG_SENDING_CHANGE.get( |
| | | replayOp.getChangeNumber().toString()); |
| | | logError(message); |
| | | session.publish(replayOp.generateMessage()); |
| | | } |
| | | message = DEBUG_CHANGES_SENT.get(); |
| | | logError(message); |
| | | } |
| | | replayOperations.clear(); |
| | | pendingChanges.setRecovering(true); |
| | | broker.setRecoveryRequired(true); |
| | | new RSUpdater(replServerMaxChangeNumber).start(); |
| | | } |
| | | } |
| | | } catch (Exception e) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Build the list of changes that have been processed by this server |
| | | * after the ChangeNumber given as a parameter and publish them |
| | | * using the given session. |
| | | * |
| | | * @param startingChangeNumber The ChangeNumber whe we need to start the |
| | | * search |
| | | * @param session The session to use to publish the changes |
| | | * |
| | | * @return A boolean indicating he success of the |
| | | * operation. |
| | | * @throws Exception if an Exception happens during the search. |
| | | */ |
| | | public boolean buildAndPublishMissingChanges( |
| | | ChangeNumber startingChangeNumber, |
| | | ReplicationBroker session) |
| | | throws Exception |
| | | { |
| | | // Trim the changes in replayOperations that are older than |
| | | // the startingChangeNumber. |
| | | synchronized (replayOperations) |
| | | { |
| | | Iterator<ChangeNumber> it = replayOperations.keySet().iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | if (it.next().olderOrEqual(startingChangeNumber)) |
| | | { |
| | | it.remove(); |
| | | } |
| | | else |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | ChangeNumber lastRetrievedChange = null; |
| | | long missingChangesDelta; |
| | | InternalSearchOperation op; |
| | | ChangeNumber currentStartChangeNumber = startingChangeNumber; |
| | | do |
| | | { |
| | | lastRetrievedChange = null; |
| | | // We can't do the search in one go because we need to |
| | | // store the results so that we are sure we send the operations |
| | | // in order and because the list might be large |
| | | // So we search by interval of 10 seconds |
| | | // and store the results in the replayOperations list |
| | | // so that they are sorted before sending them. |
| | | missingChangesDelta = currentStartChangeNumber.getTime() + 10000; |
| | | ChangeNumber endChangeNumber = |
| | | new ChangeNumber( |
| | | missingChangesDelta, 0xffffffff, serverId); |
| | | |
| | | ScanSearchListener listener = |
| | | new ScanSearchListener(currentStartChangeNumber, endChangeNumber); |
| | | op = searchForChangedEntries( |
| | | baseDn, currentStartChangeNumber, endChangeNumber, listener); |
| | | |
| | | // Publish and remove all the changes from the replayOperations list |
| | | // that are older than the endChangeNumber. |
| | | LinkedList<FakeOperation> opsToSend = new LinkedList<FakeOperation>(); |
| | | synchronized (replayOperations) |
| | | { |
| | | Iterator<FakeOperation> itOp = replayOperations.values().iterator(); |
| | | while (itOp.hasNext()) |
| | | { |
| | | FakeOperation fakeOp = itOp.next(); |
| | | if ((fakeOp.getChangeNumber().olderOrEqual(endChangeNumber)) |
| | | && state.cover(fakeOp.getChangeNumber())) |
| | | { |
| | | lastRetrievedChange = fakeOp.getChangeNumber(); |
| | | opsToSend.add(fakeOp); |
| | | itOp.remove(); |
| | | } |
| | | else |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | | for (FakeOperation opToSend : opsToSend) |
| | | { |
| | | session.publishRecovery(opToSend.generateMessage()); |
| | | } |
| | | opsToSend.clear(); |
| | | if (lastRetrievedChange != null) |
| | | { |
| | | currentStartChangeNumber = lastRetrievedChange; |
| | | } |
| | | else |
| | | { |
| | | currentStartChangeNumber = endChangeNumber; |
| | | } |
| | | |
| | | } while (pendingChanges.RecoveryUntil(lastRetrievedChange) && |
| | | (op.getResultCode().equals(ResultCode.SUCCESS))); |
| | | |
| | | return op.getResultCode().equals(ResultCode.SUCCESS); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Search for the changes that happened since fromChangeNumber |
| | | * based on the historical attribute. The only changes that will |
| | | * be send will be the one generated on the serverId provided in |
| | | * fromChangeNumber. |
| | | * @param baseDn the base DN |
| | | * @param fromChangeNumber The change number from which we want the changes |
| | | * @param resultListener that will process the entries returned. |
| | | * @param fromChangeNumber The ChangeNumber from which we want the changes |
| | | * @param lastChangeNumber The max ChangeNumber that the search should return |
| | | * @param resultListener The listener that will process the entries returned |
| | | * @return the internal search operation |
| | | * @throws Exception when raised. |
| | | */ |
| | | public static InternalSearchOperation searchForChangedEntries( |
| | | DN baseDn, |
| | | ChangeNumber fromChangeNumber, |
| | | ChangeNumber lastChangeNumber, |
| | | InternalSearchListener resultListener) |
| | | throws Exception |
| | | { |
| | |
| | | InternalClientConnection.getRootConnection(); |
| | | Integer serverId = fromChangeNumber.getServerId(); |
| | | |
| | | String maxValueForId = "ffffffffffffffff" + |
| | | String.format("%04x", serverId) + "ffffffff"; |
| | | String maxValueForId; |
| | | if (lastChangeNumber == null) |
| | | { |
| | | maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId) |
| | | + "ffffffff"; |
| | | } |
| | | else |
| | | { |
| | | maxValueForId = lastChangeNumber.toString(); |
| | | } |
| | | |
| | | LDAPFilter filter = LDAPFilter.decode( |
| | | "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:" |
| | |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * Search for the changes that happened since fromChangeNumber |
| | | * based on the historical attribute. The only changes that will |
| | | * be send will be the one generated on the serverId provided in |
| | | * fromChangeNumber. |
| | | * @param baseDn the base DN |
| | | * @param fromChangeNumber The change number from which we want the changes |
| | | * @param resultListener that will process the entries returned. |
| | | * @return the internal search operation |
| | | * @throws Exception when raised. |
| | | */ |
| | | public void handleInternalSearchEntry( |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultEntry searchEntry) |
| | | public static InternalSearchOperation searchForChangedEntries( |
| | | DN baseDn, |
| | | ChangeNumber fromChangeNumber, |
| | | InternalSearchListener resultListener) |
| | | throws Exception |
| | | { |
| | | /* |
| | | * This call back is called at session establishment phase |
| | | * for each entry that has been changed by this server and the changes |
| | | * have not been sent to any Replication Server. |
| | | * The role of this method is to build equivalent operation from |
| | | * the historical information and add them in the replayOperations |
| | | * table. |
| | | */ |
| | | Iterable<FakeOperation> updates = |
| | | Historical.generateFakeOperations(searchEntry); |
| | | for (FakeOperation op : updates) |
| | | { |
| | | replayOperations.put(op.getChangeNumber(), op); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void handleInternalSearchReference( |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultReference searchReference) |
| | | { |
| | | // TODO to be implemented |
| | | return searchForChangedEntries( |
| | | baseDn, fromChangeNumber, null, resultListener); |
| | | } |
| | | |
| | | |