| File was renamed from opendj-sdk/opends/src/server/org/opends/server/replication/plugin/SynchronizationDomain.java |
| | |
| | | import static org.opends.server.messages.ConfigMessages.*; |
| | | import static org.opends.server.messages.MessageHandler.getMessage; |
| | | import static org.opends.server.messages.ToolMessages.*; |
| | | import static org.opends.server.messages.SynchronizationMessages.*; |
| | | import static org.opends.server.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME; |
| | | import static org.opends.server.replication.protocol.OperationContext.*; |
| | | import static org.opends.server.util.StaticUtils.createEntry; |
| | |
| | | import org.opends.server.backends.jeb.BackendImpl; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.backends.task.TaskState; |
| | | import org.opends.server.config.ConfigAttribute; |
| | | import org.opends.server.config.ConfigEntry; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.config.DNConfigAttribute; |
| | |
| | | import org.opends.server.replication.protocol.ModifyDnContext; |
| | | import org.opends.server.replication.protocol.OperationContext; |
| | | import org.opends.server.replication.protocol.RoutableMessage; |
| | | import org.opends.server.replication.protocol.SynchronizationMessage; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.tasks.InitializeTargetTask; |
| | | import org.opends.server.tasks.InitializeTask; |
| | |
| | | |
| | | /** |
| | | * This class implements the bulk part of the.of the Directory Server side |
| | | * of the synchronization code. |
| | | * of the replication code. |
| | | * It contains the root method for publishing a change, |
| | | * processing a change received from the changelog service, |
| | | * handle conflict resolution, |
| | | * handle protocol messages from the changelog server. |
| | | */ |
| | | public class SynchronizationDomain extends DirectoryThread |
| | | public class ReplicationDomain extends DirectoryThread |
| | | implements ConfigurationChangeListener<MultimasterDomainCfg> |
| | | { |
| | | private SynchronizationMonitor monitor; |
| | | private ReplicationMonitor monitor; |
| | | |
| | | private ChangeNumberGenerator changeNumberGenerator; |
| | | private ChangelogBroker broker; |
| | |
| | | private int maxSendDelay = 0; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the synchronization |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | |
| | | // The task that initiated the operation. |
| | | Task initializeTask; |
| | | // The input stream for the import |
| | | SynchroLDIFInputStream ldifImportInputStream = null; |
| | | ReplLDIFInputStream ldifImportInputStream = null; |
| | | // The target in the case of an export |
| | | short exportTarget = RoutableMessage.UNKNOWN_SERVER; |
| | | // The source in the case of an import |
| | |
| | | |
| | | private DN baseDN; |
| | | |
| | | private List<ConfigAttribute> configAttributes = |
| | | new ArrayList<ConfigAttribute>(); |
| | | |
| | | private boolean shutdown = false; |
| | | |
| | | private InternalClientConnection conn = |
| | |
| | | |
| | | |
| | | /** |
| | | * Creates a new SynchronizationDomain using configuration from configEntry. |
| | | * Creates a new ReplicationDomain using configuration from configEntry. |
| | | * |
| | | * @param configuration The configuration of this SynchronizationDomain. |
| | | * @param configuration The configuration of this ReplicationDomain. |
| | | * @throws ConfigException In case of invalid configuration. |
| | | */ |
| | | public SynchronizationDomain(MultimasterDomainCfg configuration) |
| | | public ReplicationDomain(MultimasterDomainCfg configuration) |
| | | throws ConfigException |
| | | { |
| | | super("Synchronization flush"); |
| | | super("replication flush"); |
| | | |
| | | // Read the configuration parameters. |
| | | changelogServers = configuration.getChangelogServer(); |
| | |
| | | state = new PersistentServerState(baseDN); |
| | | |
| | | /* |
| | | * Create a Synchronization monitor object responsible for publishing |
| | | * Create a replication monitor object responsible for publishing |
| | | * monitoring information below cn=monitor. |
| | | */ |
| | | monitor = new SynchronizationMonitor(this); |
| | | monitor = new ReplicationMonitor(this); |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | |
| | | /* |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | | * for each operation done on the synchronization domain. |
| | | * for each operation done on the replication domain. |
| | | */ |
| | | changeNumberGenerator = new ChangeNumberGenerator(serverId, state); |
| | | |
| | |
| | | |
| | | |
| | | /** |
| | | * Returns the base DN of this SynchronizationDomain. |
| | | * Returns the base DN of this ReplicationDomain. |
| | | * |
| | | * @return The base DN of this SynchronizationDomain |
| | | * @return The base DN of this ReplicationDomain |
| | | */ |
| | | public DN getBaseDN() |
| | | { |
| | |
| | | if (ctx != null) |
| | | { |
| | | /* |
| | | * This is a synchronization operation |
| | | * This is a replication operation |
| | | * Check that the modified entry has the same entryuuid |
| | | * has was in the original message. |
| | | */ |
| | |
| | | } |
| | | else |
| | | { |
| | | // There is no Synchronization context attached to the operation |
| | | // so this is not a synchronization operation. |
| | | // There is no replication context attached to the operation |
| | | // so this is not a replication operation. |
| | | ChangeNumber changeNumber = generateChangeNumber(deleteOperation); |
| | | String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry); |
| | | ctx = new DeleteContext(changeNumber, modifiedEntryUUID); |
| | |
| | | && (!parentDnFromCtx.equals(parentDnFromEntryDn))) |
| | | { |
| | | // parentEntry has been renamed |
| | | // Synchronization name conflict resolution is expected to fix that |
| | | // replication name conflict resolution is expected to fix that |
| | | // later in the flow |
| | | addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); |
| | | return new SynchronizationProviderResult(false); |
| | |
| | | if (ctx != null) |
| | | { |
| | | /* |
| | | * This is a synchronization operation |
| | | * This is a replication operation |
| | | * Check that the modified entry has the same entryuuid |
| | | * as was in the original message. |
| | | */ |
| | |
| | | } |
| | | else |
| | | { |
| | | // There is no Synchronization context attached to the operation |
| | | // so this is not a synchronization operation. |
| | | // There is no replication context attached to the operation |
| | | // so this is not a replication operation. |
| | | ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation); |
| | | String newParentId = null; |
| | | if (modifyDNOperation.getNewSuperior() != null) |
| | |
| | | Entry modifiedEntry = modifyOperation.getModifiedEntry(); |
| | | if (ctx == null) |
| | | { |
| | | // There is no Synchronization context attached to the operation |
| | | // so this is not a synchronization operation. |
| | | // There is no replication context attached to the operation |
| | | // so this is not a replication operation. |
| | | ChangeNumber changeNumber = generateChangeNumber(modifyOperation); |
| | | String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry); |
| | | if (modifiedEntryUUID == null) |
| | |
| | | |
| | | /** |
| | | * The preOperation phase for the add Operation. |
| | | * Its job is to generate the Synchronization context associated to the |
| | | * Its job is to generate the replication context associated to the |
| | | * operation. It is necessary to do it in this phase because contrary to |
| | | * the other operations, the entry uid is not set when the handleConflict |
| | | * phase is called. |
| | |
| | | UpdateMessage update = null; |
| | | while (update == null) |
| | | { |
| | | SynchronizationMessage msg; |
| | | ReplicationMessage msg; |
| | | try |
| | | { |
| | | msg = broker.receive(); |
| | |
| | | } |
| | | UpdateMessage msg = null; |
| | | |
| | | // Note that a failed non-synchronization operation might not have a change |
| | | // Note that a failed non-replication operation might not have a change |
| | | // number. |
| | | ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op); |
| | | |
| | |
| | | |
| | | if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation())) |
| | | { |
| | | // Generate a synchronization message for a successful non-synchronization |
| | | // Generate a replication message for a successful non-replication |
| | | // operation. |
| | | msg = UpdateMessage.generateMsg(op, isAssured); |
| | | |
| | |
| | | } |
| | | else if (!op.isSynchronizationOperation()) |
| | | { |
| | | // Remove an unsuccessful non-synchronization operation from the pending |
| | | // Remove an unsuccessful non-replication operation from the pending |
| | | // changes list. |
| | | if (curChangeNumber != null) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * get the number of updates received by the synchronization plugin. |
| | | * get the number of updates received by the replication plugin. |
| | | * |
| | | * @return the number of updates received |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent by the synchronization plugin. |
| | | * Get the number of updates sent by the replication plugin. |
| | | * |
| | | * @return the number of updates sent |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * get the number of updates replayed by the synchronization. |
| | | * get the number of updates replayed by the replication. |
| | | * |
| | | * @return The number of updates replayed by the synchronization |
| | | * @return The number of updates replayed by the replication |
| | | */ |
| | | public int getNumProcessedUpdates() |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * get the number of updates replayed successfully by the synchronization. |
| | | * get the number of updates replayed successfully by the replication. |
| | | * |
| | | * @return The number of updates replayed successfully |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this SynchronizationDomain. |
| | | * Shutdown this ReplicationDomain. |
| | | */ |
| | | public void shutdown() |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the number of times the synchronization connection was lost. |
| | | * @return The number of times the synchronization connection was lost. |
| | | * Get the number of times the replication connection was lost. |
| | | * @return The number of times the replication connection was lost. |
| | | */ |
| | | public int getNumLostConnections() |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Disable the Synchronization on this domain. |
| | | * The session to the Synchronization server will be stopped. |
| | | * Disable the replication on this domain. |
| | | * The session to the replication server will be stopped. |
| | | * The domain will not be destroyed but call to the pre-operation |
| | | * methods will result in failure. |
| | | * The listener threads will be destroyed. |
| | |
| | | |
| | | /** |
| | | * Enable back the domain after a previous disable. |
| | | * The domain will connect back to a Synchronization Server and |
| | | * The domain will connect back to a replication Server and |
| | | * will recreate threads to listen for messages from the Sycnhronization |
| | | * server. |
| | | * The ServerState will also be read again from the local database. |
| | |
| | | |
| | | /** |
| | | * Receives bytes related to an entry in the context of an import to |
| | | * initialize the domain (called by SynchronizationDomainLDIFInputStream). |
| | | * initialize the domain (called by ReplLDIFInputStream). |
| | | * |
| | | * @return The bytes. Null when the Done or Err message has been received |
| | | */ |
| | | public byte[] receiveEntryBytes() |
| | | { |
| | | SynchronizationMessage msg; |
| | | ReplicationMessage msg; |
| | | while (true) |
| | | { |
| | | try |
| | |
| | | int msgID = MSGID_UNKNOWN_TYPE; |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "SynchronizationDomain/ " + message, msgID); |
| | | "ReplicationDomain/ " + message, msgID); |
| | | } |
| | | } |
| | | |
| | |
| | | ResultCode.OTHER, message, msgID, null); |
| | | } |
| | | |
| | | SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this); |
| | | ReplLDIFOutputStream os = new ReplLDIFOutputStream(this); |
| | | |
| | | LDIFExportConfig exportConfig = new LDIFExportConfig(os); |
| | | |
| | |
| | | preBackendImport(this.backend, this.backendConfigEntry); |
| | | |
| | | DN[] baseDNs = {baseDN}; |
| | | ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this); |
| | | ieContext.ldifImportInputStream = new ReplLDIFInputStream(this); |
| | | importConfig = |
| | | new LDIFImportConfig(ieContext.ldifImportInputStream); |
| | | importConfig.setIncludeBranches(this.branches); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Retrieves a synchronization domain based on the baseDN. |
| | | * Retrieves a replication domain based on the baseDN. |
| | | * |
| | | * @param baseDN The baseDN of the domain to retrieve |
| | | * @return The domain retrieved |
| | | * @throws DirectoryException When an error occured. |
| | | */ |
| | | public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN) |
| | | public static ReplicationDomain retrievesReplicationDomain(DN baseDN) |
| | | throws DirectoryException |
| | | { |
| | | SynchronizationDomain synchronizationDomain = null; |
| | | ReplicationDomain replicationDomain = null; |
| | | |
| | | // Retrieves the domain |
| | | DirectoryServer.getSynchronizationProviders(); |
| | | for (SynchronizationProvider provider : |
| | | DirectoryServer.getSynchronizationProviders()) |
| | | { |
| | | if (!( provider instanceof MultimasterSynchronization)) |
| | | if (!( provider instanceof MultimasterReplication)) |
| | | { |
| | | int msgID = MSGID_INVALID_PROVIDER; |
| | | String message = getMessage(msgID); |
| | |
| | | message, msgID); |
| | | } |
| | | |
| | | // From the domainDN retrieves the synchronization domain |
| | | SynchronizationDomain sdomain = |
| | | MultimasterSynchronization.findDomain(baseDN, null); |
| | | // From the domainDN retrieves the replication domain |
| | | ReplicationDomain sdomain = |
| | | MultimasterReplication.findDomain(baseDN, null); |
| | | if (sdomain == null) |
| | | { |
| | | int msgID = MSGID_NO_MATCHING_DOMAIN; |
| | |
| | | message, msgID); |
| | | } |
| | | |
| | | if (synchronizationDomain != null) |
| | | if (replicationDomain != null) |
| | | { |
| | | // Should never happen |
| | | int msgID = MSGID_MULTIPLE_MATCHING_DOMAIN; |
| | |
| | | throw new DirectoryException(ResultCode.OTHER, |
| | | message, msgID); |
| | | } |
| | | synchronizationDomain = sdomain; |
| | | replicationDomain = sdomain; |
| | | } |
| | | return synchronizationDomain; |
| | | return replicationDomain; |
| | | } |
| | | |
| | | /** |
| | |
| | | // Check that there is not already a domain with the same DN |
| | | // TODO : Check that the server id is a short |
| | | DN dn = configuration.getSynchronizationDN(); |
| | | if (MultimasterSynchronization.findDomain(dn,null) != null) |
| | | if (MultimasterReplication.findDomain(dn,null) != null) |
| | | { |
| | | String message = getMessage(MSGID_SYNC_INVALID_DN, dn.toString()); |
| | | unacceptableReasons.add(message); |