/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006 Sun Microsystems, Inc. */ package org.opends.server.synchronization; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.opends.server.util.TimeThread.getTime; import static org.opends.server.synchronization.SynchMessages.*; import static org.opends.server.loggers.Error.*; import static org.opends.server.messages.MessageHandler.*; import static org.opends.server.synchronization.OperationContext.SYNCHROCONTEXT; import static org.opends.server.synchronization.Historical.*; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; import java.util.zip.DataFormatException; import org.opends.server.api.ConfigurableComponent; import org.opends.server.api.DirectoryThread; import org.opends.server.config.BooleanConfigAttribute; import org.opends.server.config.ConfigAttribute; import org.opends.server.config.ConfigEntry; import org.opends.server.config.ConfigException; import org.opends.server.config.DNConfigAttribute; import org.opends.server.config.IntegerConfigAttribute; import org.opends.server.config.StringConfigAttribute; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryException; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyDNOperation; import org.opends.server.core.ModifyOperation; import org.opends.server.core.Operation; import org.opends.server.messages.MessageHandler; import org.opends.server.protocols.asn1.ASN1Exception; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.LDAPException; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.DN; import org.opends.server.types.DereferencePolicy; import org.opends.server.types.Entry; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.RDN; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.SearchScope; import org.opends.server.types.SynchronizationProviderResult; /** * This class implements the bulk part of the.of the Directory Server side * of the synchronization code. * It contains the root method for publishing a change, * processing a change received from the changelog service, * handle conflict resolution, * handle protocol messages from the changelog server. */ public class SynchronizationDomain extends DirectoryThread implements ConfigurableComponent { private SynchronizationMonitor monitor; private ChangeNumberGenerator changeNumberGenerator; private ChangelogBroker broker; private List synchroThreads = new ArrayList(); private SortedMap pendingChanges = new TreeMap(); private SortedMap waitingAckMsgs = new TreeMap(); private int numRcvdUpdates = 0; private int numSentUpdates = 0; private int numProcessedUpdates = 0; private int debugCount = 0; private ServerState state; private int numReplayedPostOpCalled = 0; private int maxReceiveQueue = 0; private int maxSendQueue = 0; private int maxReceiveDelay = 0; private int maxSendDelay = 0; private short serverId; private BooleanConfigAttribute receiveStatusStub; private int listenerThreadNumber = 10; private boolean receiveStatus = true; private List changelogServers; private DN baseDN; private List configAttributes = new ArrayList(); private boolean shutdown = false; private DN configDn; private InternalClientConnection conn = new InternalClientConnection(); static String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; static String BASE_DN_ATTR = "ds-cfg-synchronization-dn"; static String SERVER_ID_ATTR = "ds-cfg-directory-server-id"; static String RECEIVE_STATUS = "ds-cfg-receive-status"; static String MAX_RECEIVE_QUEUE = "ds-cfg-max-receive-queue"; static String MAX_RECEIVE_DELAY = "ds-cfg-max-receive-delay"; static String MAX_SEND_QUEUE = "ds-cfg-max-send-queue"; static String MAX_SEND_DELAY = "ds-cfg-max-send-delay"; private static final StringConfigAttribute changelogStub = new StringConfigAttribute(CHANGELOG_SERVER_ATTR, "changelog server information", true, true, false); private static final IntegerConfigAttribute serverIdStub = new IntegerConfigAttribute(SERVER_ID_ATTR, "server ID", true, false, false, true, 0, true, 65535); private static final DNConfigAttribute baseDnStub = new DNConfigAttribute(BASE_DN_ATTR, "synchronization base DN", true, false, false); /** * Creates a new SynchronizationDomain using configuration from configEntry. * * @param configEntry The ConfigEntry to use to read the configuration of this * SynchronizationDomain. * @throws ConfigException In case of invalid configuration. */ public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException { super("Synchronization flush"); /* * read the centralized changelog server configuration * this is a multivalued attribute */ StringConfigAttribute changelogServer = (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub); if (changelogServer == null) { throw new ConfigException(MSGID_NEED_CHANGELOG_SERVER, MessageHandler.getMessage(MSGID_NEED_CHANGELOG_SERVER, configEntry.getDN().toString()) ); } changelogServers = changelogServer.activeValues(); configAttributes.add(changelogServer); /* * read the server Id information * this is a single valued integer, its value must fit on a short integer */ IntegerConfigAttribute serverIdAttr = (IntegerConfigAttribute) configEntry.getConfigAttribute(serverIdStub); if (serverIdAttr == null) { throw new ConfigException(MSGID_NEED_SERVER_ID, MessageHandler.getMessage(MSGID_NEED_SERVER_ID, configEntry.getDN().toString()) ); } serverId = (short) serverIdAttr.activeIntValue(); configAttributes.add(serverIdAttr); /* * read the base DN */ DNConfigAttribute baseDn = (DNConfigAttribute) configEntry.getConfigAttribute(baseDnStub); if (baseDn == null) baseDN = null; // Attribute is not present : don't set a limit else baseDN = baseDn.activeValue(); configAttributes.add(baseDn); state = new ServerState(baseDN); state.loadState(); /* * Read the Receive Status. */ receiveStatusStub = new BooleanConfigAttribute(RECEIVE_STATUS, "receive status", false); BooleanConfigAttribute receiveStatusAttr = (BooleanConfigAttribute) configEntry.getConfigAttribute(receiveStatusStub); if (receiveStatusAttr != null) { receiveStatus = receiveStatusAttr.activeValue(); configAttributes.add(receiveStatusAttr); } /* * read the synchronization flow control configuration. */ IntegerConfigAttribute maxReceiveQueueStub = new IntegerConfigAttribute(MAX_RECEIVE_QUEUE, "max receive queue", false, false, false, true, 0,false, 0); IntegerConfigAttribute maxReceiveQueueAttr = (IntegerConfigAttribute) configEntry.getConfigAttribute(maxReceiveQueueStub); if (maxReceiveQueueAttr == null) maxReceiveQueue = 0; // Attribute is not present : don't set a limit else { maxReceiveQueue = maxReceiveQueueAttr.activeIntValue(); configAttributes.add(maxReceiveQueueAttr); } IntegerConfigAttribute maxReceiveDelayStub = new IntegerConfigAttribute(MAX_RECEIVE_DELAY, "max receive delay", false, false, false, true, 0, false, 0); IntegerConfigAttribute maxReceiveDelayAttr = (IntegerConfigAttribute) configEntry.getConfigAttribute(maxReceiveDelayStub); if (maxReceiveDelayAttr == null) maxReceiveDelay = 0; // Attribute is not present : don't set a limit else { maxReceiveDelay = maxReceiveDelayAttr.activeIntValue(); configAttributes.add(maxReceiveDelayAttr); } IntegerConfigAttribute maxSendQueueStub = new IntegerConfigAttribute(MAX_SEND_QUEUE, "max send queue", false, false, false, true, 0, false, 0); IntegerConfigAttribute maxSendQueueAttr = (IntegerConfigAttribute) configEntry.getConfigAttribute(maxSendQueueStub); if (maxSendQueueAttr == null) maxSendQueue = 0; // Attribute is not present : don't set a limit else { maxSendQueue = maxSendQueueAttr.activeIntValue(); configAttributes.add(maxSendQueueAttr); } IntegerConfigAttribute maxSendDelayStub = new IntegerConfigAttribute(MAX_SEND_DELAY, "max send delay", false, false, false, true, 0, false, 0); IntegerConfigAttribute maxSendDelayAttr = (IntegerConfigAttribute) configEntry.getConfigAttribute(maxSendDelayStub); if (maxSendDelayAttr == null) maxSendDelay = 0; // Attribute is not present : don't set a limit else { maxSendDelay = maxSendDelayAttr.activeIntValue(); configAttributes.add(maxSendDelayAttr); } configDn = configEntry.getDN(); DirectoryServer.registerConfigurableComponent(this); monitor = new SynchronizationMonitor(this); DirectoryServer.registerMonitorProvider(monitor); // TODO : read RUV from database an make sure we don't // generate changeNumber smaller than ChangeNumbers in the RUV long startingChangeNumber = getTime(); changeNumberGenerator = new ChangeNumberGenerator(serverId, startingChangeNumber); /* * create the broker object used to publish and receive changes */ try { broker = new ChangelogBroker(this); synchronized (broker) { broker.start(serverId, changelogServers); if (!receiveStatus) broker.suspendReceive(); } } catch (Exception e) { /* TODO should mark that changelog service is * not available, log an error and retry upon timeout * should we stop the modifications ? */ } } /** * {@inheritDoc} */ public DN getConfigurableComponentEntryDN() { return configDn; } /** * {@inheritDoc} */ public List getConfigurationAttributes() { return configAttributes; } /** * {@inheritDoc} */ public boolean hasAcceptableConfiguration(ConfigEntry configEntry, List unacceptableReasons) { boolean acceptable = true; StringConfigAttribute changelog = null; try { changelog = (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub); } catch (ConfigException e) { acceptable = false; unacceptableReasons.add("Need at least one changelog server."); } if (changelog == null) { acceptable = false; unacceptableReasons.add("Need at least one changelog server."); } return acceptable; } /** * {@inheritDoc} */ public ConfigChangeResult applyNewConfiguration(ConfigEntry configEntry, boolean detailedResults) { StringConfigAttribute changelog = null; List newChangelogServers; boolean newReceiveStatus; try { /* * check if changelog server list changed */ changelog = (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub); newChangelogServers = changelog.activeValues(); boolean sameConf = true; for (String s :newChangelogServers) if (!changelogServers.contains(s)) sameConf = false; for (String s : changelogServers) if (!newChangelogServers.contains(s)) sameConf = false; if (!sameConf) { broker.stop(); changelogServers = newChangelogServers; broker.start(serverId, changelogServers); } /* * check if reception should be disabled */ newReceiveStatus = ((BooleanConfigAttribute) configEntry.getConfigAttribute(receiveStatusStub)).activeValue(); if (newReceiveStatus != receiveStatus) { /* * was disabled and moved to enabled */ if (newReceiveStatus) { broker.restartReceive(); for (int i=0; i attrs = new LinkedHashSet(1); attrs.add(ENTRYUIDNAME); InternalSearchOperation search = conn.processSearch(dn, SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, SearchFilter.createFilterFromString("objectclass=*"), attrs); if (search.getResultCode() == ResultCode.SUCCESS) { LinkedList result = search.getSearchEntries(); if (!result.isEmpty()) { SearchResultEntry resultEntry = result.getFirst(); if (resultEntry != null) { return Historical.getEntryUuid(resultEntry); } } } } catch (DirectoryException e) { // never happens because the filter is always valid. } return null; } /** * find the current dn of an entry from its entry uuid. * * @param uuid the Entry Unique ID. * @return The curernt dn of the entry or null if there is no entry with * the specified uuid. */ private DN findEntryDN(String uuid) { try { InternalSearchOperation search = conn.processSearch(baseDN, SearchScope.WHOLE_SUBTREE, SearchFilter.createFilterFromString("entryuuid="+uuid)); if (search.getResultCode() == ResultCode.SUCCESS) { LinkedList result = search.getSearchEntries(); if (!result.isEmpty()) { SearchResultEntry resultEntry = result.getFirst(); if (resultEntry != null) { return resultEntry.getDN(); } } } } catch (DirectoryException e) { // never happens because the filter is always valid. } return null; } /** * Solve a conflict detected when replaying a modify operation. * * @param op The operation that triggered the conflict detection. * @param msg The operation that triggered the conflict detection. * @return true if the process is completed, false if it must continue.. */ private boolean solveNamingConflict(ModifyOperation op, UpdateMessage msg) { ResultCode result = op.getResultCode(); ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT); String entryUid = ctx.getEntryUid(); if (result == ResultCode.NO_SUCH_OBJECT) { /* * This error may happen the operation is a modification but * the entry had been renamed on a different master in the same time. * search if the entry has been renamed, and return the new dn * of the entry. */ msg.setDn(findEntryDN(entryUid).toString()); return false; } return true; } /** Solve a conflict detected when replaying a delete operation. * * @param op The operation that triggered the conflict detection. * @param msg The operation that triggered the conflict detection. * @return true if the process is completed, false if it must continue.. */ private boolean solveNamingConflict(DeleteOperation op, UpdateMessage msg) { ResultCode result = op.getResultCode(); DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT); String entryUid = ctx.getEntryUid(); if (result == ResultCode.NO_SUCH_OBJECT) { /* * Find if the entry is still in the database. */ DN currentDn = findEntryDN(entryUid); if (currentDn == null) { /* * The entry has already been deleted, either because this delete * has already been replayed or because another concurrent delete * has already done the job. * In any case, there is is nothing more to do. */ return true; } else { /* * This entry has been renamed, replay the delete using its new DN. */ msg.setDn(currentDn.toString()); return false; } } else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF) { /* * This may happen when we replay a DELETE done on a master * but children of this entry have been added on another master. */ /* * TODO : either delete all the childs or rename the child below * the top suffix by adding entryuuid in dn and delete this entry. */ } return true; } /** * Solve a conflict detected when replaying a ADD operation. * * @param op The operation that triggered the conflict detection. * @param msg The operation that triggered the conflict detection. * @return true if the process is completed, false if it must continue. * @throws Exception When the operation is not valid. */ private boolean solveNamingConflict(AddOperation op, UpdateMessage msg) throws Exception { ResultCode result = op.getResultCode(); AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT); String entryUid = ctx.getEntryUid(); String parentUniqueId = ctx.getParentUid(); if (result == ResultCode.NO_SUCH_OBJECT) { /* * This can happen if the parent has been renamed or deleted * find the parent dn and calculate a new dn for the entry */ if (parentUniqueId == null) { /* * This entry is the base dn of the backend. * It is quite weird that the operation result be NO_SUCH_OBJECT. * There is notthing more we can do except TODO log a * message for the repair tool to look at this problem. */ return true; } DN parentDn = findEntryDN(parentUniqueId); if (parentDn == null) { /* * The parent has been deleted, so this entry should not * exist don't do the ADD. */ return true; } else { RDN entryRdn = op.getEntryDN().getRDN(); msg.setDn(parentDn + "," + entryRdn); return false; } } else if (result == ResultCode.ENTRY_ALREADY_EXISTS) { /* * This can happen if * - two adds are done on different servers but with the * same target DN. * - the same ADD is being replayed for the second time on this server. * if the nsunique ID already exist, assume this is a replay and * don't do anything * if the entry unique id do not exist, generate conflict. */ if (findEntryDN(entryUid) != null) { // entry already exist : this is a replay return true; } else { addConflict(op); msg.setDn(generateConflictDn(entryUid, msg.getDn())); return false; } } return true; } /** * Solve a conflict detected when replaying a Modify DN operation. * * @param op The operation that triggered the conflict detection. * @param msg The operation that triggered the conflict detection. * @return true if the process is completed, false if it must continue. * @throws Exception When the operation is not valid. */ private boolean solveNamingConflict(ModifyDNOperation op, UpdateMessage msg) throws Exception { ResultCode result = op.getResultCode(); ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT); String entryUid = ctx.getEntryUid(); String newSuperiorID = ctx.getNewParentId(); if (result == ResultCode.NO_SUCH_OBJECT) { ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg; /* * four possible cases : * - the modified entry has been renamed * - the new parent has been renamed * - the operation is replayed for the second time. * - the entry has been deleted * action : * - change the target dn and the new parent dn and * restart the operation, * - don't do anything if the operation is replayed. */ // Construct the new DN to use for the entry. DN entryDN = op.getEntryDN(); DN newSuperior = findEntryDN(newSuperiorID); RDN newRDN = op.getNewRDN(); DN parentDN; if (newSuperior == null) { parentDN = entryDN.getParent(); } else { parentDN = newSuperior; } if ((parentDN == null) || parentDN.isNullDN()) { /* this should never happen * can't solve any conflict in this case. */ throw new Exception("operation parameters are invalid"); } RDN[] parentComponents = parentDN.getRDNComponents(); RDN[] newComponents = new RDN[parentComponents.length+1]; System.arraycopy(parentComponents, 0, newComponents, 1, parentComponents.length); newComponents[0] = newRDN; DN newDN = new DN(newComponents); // get the current DN of this entry in the database. DN currentDN = findEntryDN(entryUid); // if the newDN and the current DN match then the operation // is a no-op (this was probably a second replay) // don't do anything. if (newDN.equals(currentDN)) { return true; } msg.setDn(currentDN.toString()); modifyDnMsg.setNewSuperior(newSuperior.toString()); return false; } else if (result == ResultCode.ENTRY_ALREADY_EXISTS) { /* * This may happen when two modifyDn operation * are done on different servers but with the same target DN * add the conflict object class to the entry * and rename it using its entryuuid. */ generateAddConflictOp(op); msg.setDn(generateConflictDn(entryUid, msg.getDn())); return false; } return true; } /** * Generate a modification to add the conflict ObjectClass to an entry * whose Dn is now conflicting with another entry. * * @param op The operation causing the conflict. */ private void generateAddConflictOp(ModifyDNOperation op) { // TODO } /** * Add the conflict object class to an entry that could * not be added because it is conflicting with another entry. * * @param addOp The conflicting Add Operation. */ private void addConflict(AddOperation addOp) { /* * TODO */ } /** * Generate the Dn to use for a conflicting entry. * * @param op Operation that generated the conflict * @param dn Original dn. * @return The generated Dn for a conflicting entry. */ private String generateConflictDn(String entryUid, String dn) { return dn + "entryuuid=" + entryUid; } /** * Check if an operation must be processed as an assured operation. * * @param op the operation to be checked. * @return true if the operations must be processed as an assured operation. */ private boolean isAssured(Operation op) { // TODO : should have a filtering mechanism for checking // operation that are assured and operations that are not. return false; } /** * Push all committed local changes to the changelog service. * PRECONDITION : The pendingChanges lock must be held before calling * this method. */ private void pushCommittedChanges() { if (pendingChanges.isEmpty()) return; ChangeNumber firstChangeNumber = pendingChanges.firstKey(); PendingChange firstChange = pendingChanges.get(firstChangeNumber); while ((firstChange != null) && firstChange.isCommitted()) { if (firstChange.getOp().isSynchronizationOperation() == false) { numSentUpdates++; broker.publish(firstChange.getMsg()); } state.update(firstChangeNumber); pendingChanges.remove(firstChangeNumber); if (pendingChanges.isEmpty()) { firstChange = null; } else { firstChangeNumber = pendingChanges.firstKey(); firstChange = pendingChanges.get(firstChangeNumber); } } } /** * Check if a ConfigEntry is valid. * @param configEntry The config entry that needs to be checked. * @param unacceptableReason A description of the reason why the config entry * is not acceptable (if return is false). * @return a boolean indicating if the configEntry is valid. */ public static boolean checkConfigEntry(ConfigEntry configEntry, StringBuilder unacceptableReason) { try { StringConfigAttribute changelogServer = (StringConfigAttribute) configEntry.getConfigAttribute(changelogStub); if (changelogServer == null) { unacceptableReason.append( MessageHandler.getMessage(MSGID_NEED_CHANGELOG_SERVER, configEntry.getDN().toString()) ); return false; } /* * read the server Id information * this is a single valued integer, its value must fit on a short integer */ IntegerConfigAttribute serverIdAttr = (IntegerConfigAttribute) configEntry.getConfigAttribute(serverIdStub); if (serverIdAttr == null) { unacceptableReason.append( MessageHandler.getMessage(MSGID_NEED_SERVER_ID, configEntry.getDN().toString()) ); return false; } } catch (ConfigException e) { unacceptableReason.append(e.getMessage()); return false; } return true; } }