opends/resource/config/config.ldif
@@ -126,7 +126,6 @@ ds-cfg-index-type: presence ds-cfg-index-type: equality ds-cfg-index-type: substring ds-cfg-index-entry-limit: 4000 dn: ds-cfg-index-attribute=givenName,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config objectClass: top @@ -135,7 +134,6 @@ ds-cfg-index-type: presence ds-cfg-index-type: equality ds-cfg-index-type: substring ds-cfg-index-entry-limit: 4000 dn: ds-cfg-index-attribute=mail,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config objectClass: top @@ -144,14 +142,12 @@ ds-cfg-index-type: presence ds-cfg-index-type: equality ds-cfg-index-type: substring ds-cfg-index-entry-limit: 4000 dn: ds-cfg-index-attribute=member,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config objectClass: top objectClass: ds-cfg-je-index ds-cfg-index-attribute: member ds-cfg-index-type: equality ds-cfg-index-entry-limit: 4000 dn: ds-cfg-index-attribute=sn,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config objectClass: top @@ -160,7 +156,6 @@ ds-cfg-index-type: presence ds-cfg-index-type: equality ds-cfg-index-type: substring ds-cfg-index-entry-limit: 4000 dn: ds-cfg-index-attribute=telephoneNumber,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config objectClass: top @@ -169,22 +164,24 @@ ds-cfg-index-type: presence ds-cfg-index-type: equality ds-cfg-index-type: substring ds-cfg-index-entry-limit: 4000 dn: ds-cfg-index-attribute=uid,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config objectClass: top objectClass: ds-cfg-je-index ds-cfg-index-attribute: uid ds-cfg-index-type: equality ds-cfg-index-entry-limit: 4000 dn: ds-cfg-index-attribute=ds-sync-hist,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config objectClass: top objectClass: ds-cfg-je-index objectClass: extensibleObject ds-cfg-index-attribute: ds-sync-hist ds-cfg-index-type: ordering ds-cfg-index-entry-limit: 4000 dn: ds-cfg-index-attribute=entryuuid,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config objectClass: top objectClass: ds-cfg-je-index ds-cfg-index-attribute: entryuuid ds-cfg-index-type: equality dn: cn=JE Database,ds-cfg-backend-id=userRoot,cn=Backends,cn=config objectClass: top opends/src/server/org/opends/server/changelog/ChangelogCache.java
@@ -265,8 +265,6 @@ { /* * create the balanced tree that will be used to forward changes * TODO initialize it will all the previous changes that this replicaID * has not seen */ synchronized (connectedServers) { @@ -305,8 +303,6 @@ { /* * create the balanced tree that will be used to forward changes * TODO initialize it will all the previous changes that this replicaID * has not seen * TODO throw proper exception */ synchronized (changelogServers) @@ -482,7 +478,6 @@ public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver, short serverId) { // TODO Auto-generated method stub ServerHandler handler; if (isLDAPserver) handler = connectedServers.get(serverId); opends/src/server/org/opends/server/changelog/ChangelogDB.java
@@ -349,7 +349,7 @@ catch (DatabaseException dbe) { } /* database is faulty : TODO : log better message */ /* database is faulty */ int msgID = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR; String message = getMessage(msgID) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, @@ -553,7 +553,8 @@ * to continue with the next record. * In such case, it is therefore possible that we miss some changes. * TODO. log an error message. * TODO. Such problem should be handled by the repair functionality. * TODO : REPAIR : Such problem should be handled by the * repair functionality. */ } } opends/src/server/org/opends/server/core/ModifyDNOperation.java
@@ -1137,36 +1137,6 @@ } // Invoke any conflict resolution processing that might be needed by the // synchronization provider. for (SynchronizationProvider provider : DirectoryServer.getSynchronizationProviders()) { try { SynchronizationProviderResult result = provider.handleConflictResolution(this); if (! result.continueOperationProcessing()) { break modifyDNProcessing; } } catch (DirectoryException de) { assert debugException(CLASS_NAME, "run", de); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, MSGID_MODDN_SYNCH_CONFLICT_RESOLUTION_FAILED, getConnectionID(), getOperationID(), stackTraceToSingleLineString(de)); setResponseData(de); break modifyDNProcessing; } } // Get the current entry from the appropriate backend. If it doesn't // exist, then fail. try @@ -1215,6 +1185,37 @@ break modifyDNProcessing; } // Invoke any conflict resolution processing that might be needed by the // synchronization provider. for (SynchronizationProvider provider : DirectoryServer.getSynchronizationProviders()) { try { SynchronizationProviderResult result = provider.handleConflictResolution(this); if (! result.continueOperationProcessing()) { break modifyDNProcessing; } } catch (DirectoryException de) { assert debugException(CLASS_NAME, "run", de); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, MSGID_MODDN_SYNCH_CONFLICT_RESOLUTION_FAILED, getConnectionID(), getOperationID(), stackTraceToSingleLineString(de)); setResponseData(de); break modifyDNProcessing; } } // Check to see if the client has permission to perform the // modify DN. opends/src/server/org/opends/server/synchronization/AddContext.java
New file @@ -0,0 +1,62 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006 Sun Microsystems, Inc. */ package org.opends.server.synchronization; /** * This class describe the Synchronization contexte that is attached to * Add Operation. */ public class AddContext extends OperationContext { /** * The Unique Id of the parent entry od the added entry. */ private String parentUid; /** * Creates a new AddContext with the provided information. * * @param changeNumber The change number of the add operation. * @param uid the Unique Id of the added entry. * @param parentUid The unique Id of the parent of the added entry. */ public AddContext(ChangeNumber changeNumber, String uid, String parentUid) { super(changeNumber, uid); this.parentUid = parentUid; } /** * Get the Unique Id of the parent of the added entry. * * @return Returns the Unique Id of the parent of the added entry. */ public String getParentUid() { return parentUid; } } opends/src/server/org/opends/server/synchronization/AddMsg.java
@@ -45,7 +45,7 @@ import org.opends.server.types.Attribute; import org.opends.server.types.AttributeValue; import static org.opends.server.synchronization.SynchMessages.*; import static org.opends.server.synchronization.OperationContext.*; import static org.opends.server.util.StaticUtils.toLowerCase; /** @@ -55,8 +55,8 @@ public class AddMsg extends UpdateMessage { private static final long serialVersionUID = -4905520652801395185L; private String dn; private byte[] encodedAttributes ; private String parentUniqueId; /** * Creates a new AddMessage. @@ -64,6 +64,12 @@ */ public AddMsg(AddOperation op) { super((AddContext) op.getAttachment(SYNCHROCONTEXT), op.getRawEntryDN().stringValue()); AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT); this.parentUniqueId = ctx.getParentUid(); // Encode the object classes (SET OF LDAPString). LinkedHashSet<AttributeValue> ocValues = new LinkedHashSet<AttributeValue>(op.getObjectClasses().size()); @@ -99,12 +105,8 @@ } } dn = op.getRawEntryDN().stringValue(); // Encode the sequence. encodedAttributes = ASN1Element.encodeValue(elems); changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION); } /** @@ -112,18 +114,22 @@ * * @param cn ChangeNumber of the add. * @param dn DN of the added entry. * @param uniqueId The Unique identifier of the added entry. * @param parentId The unique Id of the parent of the added entry. * @param objectClass objectclass of the added entry. * @param userAttributes user attributes of the added entry. * @param operationalAttributes operational attributes of the added entry. */ public AddMsg(ChangeNumber cn, String dn, String uniqueId, String parentId, Attribute objectClass, Collection<Attribute> userAttributes, Collection<Attribute> operationalAttributes) { this.dn = dn; this.changeNumber = cn; super (new AddContext(cn, uniqueId, parentId), dn); this.parentUniqueId = parentId; ArrayList<ASN1Element> elems = new ArrayList<ASN1Element>(); elems.add(new LDAPAttribute(objectClass).encode()); @@ -142,40 +148,29 @@ * * @param in The byte[] from which the operation must be read. * @throws DataFormatException The input byte[] is not a valid AddMsg * @throws UnsupportedEncodingException If UTF8 is not supported by the jvm */ public AddMsg(byte[] in) throws DataFormatException public AddMsg(byte[] in) throws DataFormatException, UnsupportedEncodingException { /* first byte is the type */ if (in[0] != MSG_TYPE_ADD_REQUEST) throw new DataFormatException("byte[] is not a valid add msg"); int pos = 1; super(in); /* read the dn * first calculate the length then construct the string */ int length = 0; int offset = pos; while (in[pos++] != 0) int pos = decodeHeader(MSG_TYPE_ADD_REQUEST, in); // read the parent unique Id int length = getNextLength(in, pos); if (length != 0) { if (pos > in.length) throw new DataFormatException("byte[] is not a valid add msg"); length++; parentUniqueId = new String(in, pos, length, "UTF-8"); pos += length + 1; } try else { dn = new String(in, offset, length, "UTF-8"); /* read the changeNumber * it is always 24 characters long */ String changenumberStr = new String(in, pos, 24, "UTF-8"); changeNumber = new ChangeNumber(changenumberStr); pos +=24; } catch (UnsupportedEncodingException e ) { throw new DataFormatException("UTF-8 is not supported by this jvm."); parentUniqueId = null; pos += 1; } /* Read the attributes : all the remaining bytes */ // Read the attributes : all the remaining bytes encodedAttributes = new byte[in.length-pos]; int i =0; while (pos<in.length) @@ -185,14 +180,11 @@ } /** * Create and return an AddOperation from a received ADD message. * @param connection The connection where we received the message. * @return The created operation. * @throws LDAPException In case msg encoding was not valid. * @throws ASN1Exception In case msg encoding was not valid. * {@inheritDoc} */ @Override public AddOperation createOperation(InternalClientConnection connection) public AddOperation createOperation(InternalClientConnection connection, String newDn) throws LDAPException, ASN1Exception { ArrayList<LDAPAttribute> attr = new ArrayList<LDAPAttribute>(); @@ -207,9 +199,10 @@ AddOperation add = new AddOperation(connection, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), null, new ASN1OctetString(dn), attr); add.setAttachment(SYNCHRONIZATION, getChangeNumber()); new ASN1OctetString(newDn), attr); AddContext ctx = new AddContext(getChangeNumber(), getUniqueId(), parentUniqueId); add.setAttachment(SYNCHROCONTEXT, ctx); return add; } @@ -220,36 +213,30 @@ @Override public byte[] getBytes() { byte[] byteDn; try { byteDn = dn.getBytes("UTF-8"); /* The ad message is stored in the form : * <operation type><dn><changenumber><attributes> * the length of result byte array is therefore : * 1 + dn length + 1 + 24 + attribute length */ int length = 1 + byteDn.length + 1 + 24 + encodedAttributes.length; byte[] resultByteArray = new byte[length]; int pos = 1; /* put the type of the operation */ resultByteArray[0] = MSG_TYPE_ADD_REQUEST; /* put the DN and a terminating 0 */ for (int i = 0; i< byteDn.length; i++,pos++) int length = encodedAttributes.length; byte[] byteParentId = null; if (parentUniqueId != null) { resultByteArray[pos] = byteDn[i]; byteParentId = parentUniqueId.getBytes("UTF-8"); length += byteParentId.length + 1; } resultByteArray[pos++] = 0; /* put the ChangeNumber */ byte[] changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8"); for (int i=0; i<24; i++,pos++) else { resultByteArray[pos] = changeNumberByte[i]; length += 1; } /* encode the header in a byte[] large enough to also contain the mods */ byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD_REQUEST, length); int pos = resultByteArray.length - length; if (byteParentId != null) pos = addByteArray(byteParentId, resultByteArray, pos); else resultByteArray[pos++] = 0; /* put the attributes */ for (int i=0; i<encodedAttributes.length; i++,pos++) { @@ -271,6 +258,6 @@ @Override public String toString() { return ("ADD " + dn + " " + getChangeNumber()); return ("ADD " + getDn() + " " + getChangeNumber()); } } opends/src/server/org/opends/server/synchronization/DeleteContext.java
New file @@ -0,0 +1,44 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006 Sun Microsystems, Inc. */ package org.opends.server.synchronization; /** * This class is used to describe the context attached to a Delete Operation. */ public class DeleteContext extends OperationContext { /** * Creates a new DeleteContext with the provided information. * * @param changeNumber The change number of the Delete Operation. * @param uid The unique Id of the deleted entry. */ public DeleteContext(ChangeNumber changeNumber, String uid) { super(changeNumber, uid); } } opends/src/server/org/opends/server/synchronization/DeleteMsg.java
@@ -26,7 +26,7 @@ */ package org.opends.server.synchronization; import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION; import static org.opends.server.synchronization.OperationContext.*; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; @@ -41,7 +41,6 @@ */ public class DeleteMsg extends UpdateMessage { private String dn; private static final long serialVersionUID = -4905520652801395185L; /** @@ -50,8 +49,8 @@ */ public DeleteMsg(DeleteOperation op) { dn = op.getRawEntryDN().stringValue(); changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION); super((OperationContext) op.getAttachment(SYNCHROCONTEXT), op.getRawEntryDN().stringValue()); } /** @@ -59,55 +58,29 @@ * * @param in The byte[] from which the operation must be read. * @throws DataFormatException The input byte[] is not a valid AddMsg * @throws UnsupportedEncodingException If UTF8 is not supported by the jvm */ public DeleteMsg(byte[] in) throws DataFormatException public DeleteMsg(byte[] in) throws DataFormatException, UnsupportedEncodingException { /* first byte is the type */ if (in[0] != MSG_TYPE_DELETE_REQUEST) throw new DataFormatException("byte[] is not a valid delete msg"); int pos = 1; /* read the dn * first calculate the length then construct the string */ int length = 0; int offset = pos; while (in[pos++] != 0) { if (pos > in.length) throw new DataFormatException("byte[] is not a valid delete msg"); length++; } try { dn = new String(in, offset, length, "UTF-8"); /* read the changeNumber * it is always 24 characters long */ String changenumberStr = new String(in, pos, 24, "UTF-8"); changeNumber = new ChangeNumber(changenumberStr); } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } super(in); decodeHeader(MSG_TYPE_DELETE_REQUEST, in); } /** * Create an Operation from a delete Message. * * @param connection the connection * @return the Operation from which the message was received * {@inheritDoc} */ @Override public Operation createOperation(InternalClientConnection connection) public Operation createOperation(InternalClientConnection connection, String newDn) { DeleteOperation del = new DeleteOperation(connection, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), null, new ASN1OctetString(dn)); del.setAttachment(SYNCHRONIZATION, getChangeNumber()); new ASN1OctetString(newDn)); DeleteContext ctx = new DeleteContext(getChangeNumber(), getUniqueId()); del.setAttachment(SYNCHROCONTEXT, ctx); return del; } @@ -119,44 +92,14 @@ @Override public byte[] getBytes() { byte[] byteDn; try { byteDn = dn.getBytes("UTF-8"); /* The Delete message is stored in the form : * <operation type><dn><changenumber> * the length of result byte array is therefore : * 1 + dn length + 1 + 24 */ int length = 1 + byteDn.length + 1 + 24; byte[] resultByteArray = new byte[length]; int pos = 1; /* put the type of the operation */ resultByteArray[0] = MSG_TYPE_DELETE_REQUEST; /* put the DN and a terminating 0 */ for (int i = 0; i< byteDn.length; i++,pos++) { resultByteArray[pos] = byteDn[i]; } resultByteArray[pos++] = 0; /* put the ChangeNumber */ byte[] changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8"); for (int i=0; i<24; i++,pos++) { resultByteArray[pos] = changeNumberByte[i]; } return resultByteArray; return encodeHeader(MSG_TYPE_DELETE_REQUEST, 0); } catch (UnsupportedEncodingException e) { // should never happen : TODO : log error properly return null; } return null; } /** @@ -165,6 +108,6 @@ @Override public String toString() { return ("DEL " + dn + " " + getChangeNumber()); return ("DEL " + getDn() + " " + getChangeNumber()); } } opends/src/server/org/opends/server/synchronization/Historical.java
@@ -35,6 +35,7 @@ import java.util.Set; import java.util.TreeMap; import org.opends.server.core.AddOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyOperation; import org.opends.server.types.Attribute; @@ -44,7 +45,6 @@ import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import static org.opends.server.synchronization.SynchMessages.*; /** * This class is used to store historical information that is @@ -68,6 +68,9 @@ static final String HISTORICALATTRIBUTENAME = "ds-sync-hist"; static final AttributeType historicalAttrType = DirectoryServer.getSchema().getAttributeType(HISTORICALATTRIBUTENAME); static final String ENTRYUIDNAME = "entryuuid"; static final AttributeType entryuuidAttrType = DirectoryServer.getSchema().getAttributeType(ENTRYUIDNAME); /* * The last update seen on this entry, allows fast conflict detection. @@ -109,7 +112,7 @@ { List<Modification> mods = modifyOperation.getModifications(); ChangeNumber changeNumber = (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION); OperationContext.getChangeNumber(modifyOperation); for (Iterator modsIterator = mods.iterator(); modsIterator.hasNext();) { @@ -288,7 +291,8 @@ List<Modification> mods = modifyOperation.getModifications(); Entry modifiedEntry = modifyOperation.getModifiedEntry(); ChangeNumber changeNumber = (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION); OperationContext.getChangeNumber(modifyOperation); /* * If this is a local operation we need first to update the historical * information, then update the entry with the historical information @@ -802,7 +806,7 @@ } catch (Exception e) { /* * TODO This Exception shows that there are some * TODO : REPAIR : This Exception shows that there are some * inconsistency in the historical information. * This method can't fix the problem. * This should be logged and somehow the repair @@ -812,13 +816,70 @@ } else { modifyFakeOperation = new ModifyFakeOperation(entry.getDN(),cn); modifyFakeOperation.addModification(mod); operations.put(histVal.getCn(), modifyFakeOperation); String uuidString = getEntryUuid(entry); if (uuidString != null) { modifyFakeOperation = new ModifyFakeOperation(entry.getDN(), cn, uuidString); modifyFakeOperation.addModification(mod); operations.put(histVal.getCn(), modifyFakeOperation); } } } } } return operations.values(); } /** * Get the entry unique Id in String form. * * @param entry The entry for which the unique id should be returned. * * @return The Unique Id of the entry if it has one. null, otherwise. */ public static String getEntryUuid(Entry entry) { String uuidString = null; List<Attribute> uuidAttrs = entry.getOperationalAttribute(entryuuidAttrType); if (uuidAttrs != null) { Attribute uuid = uuidAttrs.get(0); if (uuid.hasValue()) { AttributeValue uuidVal = uuid.getValues().iterator().next(); uuidString = uuidVal.getStringValue(); } } return uuidString; } /** * Get the Entry Unique Id from an add operation. * This must be called after the entry uuid preop plugin (i.e no * sooner than the synchronization provider pre-op) * * @param op The operation * @return The Entry Unique Id String form. */ public static String getEntryUuid(AddOperation op) { String uuidString = null; Map<AttributeType, List<Attribute>> attrs = op.getOperationalAttributes(); List<Attribute> uuidAttrs = attrs.get(entryuuidAttrType); if (uuidAttrs != null) { Attribute uuid = uuidAttrs.get(0); if (uuid.hasValue()) { AttributeValue uuidVal = uuid.getValues().iterator().next(); uuidString = uuidVal.getStringValue(); } } return uuidString; } } opends/src/server/org/opends/server/synchronization/ListenerThread.java
@@ -26,22 +26,15 @@ */ package org.opends.server.synchronization; import java.util.zip.DataFormatException; import org.opends.server.api.DirectoryThread; import org.opends.server.core.Operation; import org.opends.server.protocols.asn1.ASN1Exception; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.ldap.LDAPException; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.ResultCode; import static org.opends.server.loggers.Error.logError; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.synchronization.SynchMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import org.opends.server.api.DirectoryThread; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; /** * Thread that is used to get messages from the Changelog servers * and replay them in the current server. @@ -49,22 +42,17 @@ public class ListenerThread extends DirectoryThread { private SynchronizationDomain listener; private ChangeNumberGenerator CNgen; private boolean shutdown = false; /** * Constructor for the ListenerThread. * * @param listener the Plugin that created this thread * @param gen the Generator to use to get new ChangeNumber */ public ListenerThread(SynchronizationDomain listener, ChangeNumberGenerator gen) public ListenerThread(SynchronizationDomain listener) { super("Listener thread"); super("Synchronization Listener thread"); this.listener = listener; this.CNgen = gen; setName("Synchronization Listener"); } /** @@ -80,82 +68,24 @@ */ public void run() { InternalClientConnection conn = new InternalClientConnection(); UpdateMessage msg; while (((msg = listener.receive()) != null) && (shutdown == false)) try { Operation op; try while (((msg = listener.receive()) != null) && (shutdown == false)) { op = msg.createOperation(conn); op.setInternalOperation(true); op.setSynchronizationOperation(true); ChangeNumber changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION); if (changeNumber != null) CNgen.adjust(changeNumber); try { op.run(); if (op.getResultCode() != ResultCode.SUCCESS) { int msgID = MSGID_ERROR_REPLAYING_OPERATION; String message = getMessage(msgID, op.getResultCode().getResultCodeName(), changeNumber.toString(), op.toString(), op.getErrorMessage()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); listener.updateError(changeNumber); } } catch (Exception e) { int msgID = MSGID_EXCEPTION_REPLAYING_OPERATION; String message = getMessage(msgID, stackTraceToSingleLineString(e), op.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); listener.updateError(changeNumber); } listener.replay(msg); } catch (ASN1Exception e) { int msgID = MSGID_EXCEPTION_DECODING_OPERATION; String message = getMessage(msgID, msg) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } catch (LDAPException e) { int msgID = MSGID_EXCEPTION_DECODING_OPERATION; String message = getMessage(msgID, msg) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } catch (DataFormatException e) { int msgID = MSGID_EXCEPTION_DECODING_OPERATION; String message = getMessage(msgID, msg) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } finally { if (msg.isAssured()) listener.ack(msg.getChangeNumber()); listener.incProcessedUpdates(); } } catch (Exception e) { /* * catch all exceptions happening in listener.receive and listener.replay * so that the thread never dies even in case of problems. */ int msgID = MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE; String message = getMessage(msgID, stackTraceToSingleLineString(e)); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } } } opends/src/server/org/opends/server/synchronization/ModifyContext.java
New file @@ -0,0 +1,46 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006 Sun Microsystems, Inc. */ package org.opends.server.synchronization; /** * This class describe the synchronization context that is attached to * Modify operation. */ public class ModifyContext extends OperationContext { /** * Creates a new Modify Context with the provided parameters. * * @param changeNumber The change number of the operation. * @param uid the unique Id of the modified entry. */ public ModifyContext(ChangeNumber changeNumber, String uid) { super(changeNumber, uid); } } opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java
@@ -26,7 +26,7 @@ */ package org.opends.server.synchronization; import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION; import static org.opends.server.synchronization.OperationContext.*; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; @@ -41,10 +41,10 @@ */ public class ModifyDNMsg extends UpdateMessage { private String dn; private String newRDN; private String newSuperior; private boolean deleteOldRdn; private String newSuperiorId; private static final long serialVersionUID = -4905520652801395185L; /** @@ -54,106 +54,83 @@ */ public ModifyDNMsg(ModifyDNOperation op) { dn = op.getRawEntryDN().stringValue(); super((OperationContext) op.getAttachment(SYNCHROCONTEXT), op.getRawEntryDN().stringValue()); ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT); newSuperiorId = ctx.getNewParentId(); deleteOldRdn = op.deleteOldRDN(); if (op.getRawNewSuperior() != null) newSuperior = op.getRawNewSuperior().stringValue(); else newSuperior = null; newRDN = op.getRawNewRDN().stringValue(); changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION); } /** * Creates a new Add message from a byte[]. * Creates a new ModifyDN message from a byte[]. * * @param in The byte[] from which the operation must be read. * @throws DataFormatException The input byte[] is not a valid AddMsg * @throws DataFormatException The input byte[] is not a valid AddMsg. * @throws UnsupportedEncodingException If UTF8 is not supported. */ public ModifyDNMsg(byte[] in) throws DataFormatException public ModifyDNMsg(byte[] in) throws DataFormatException, UnsupportedEncodingException { /* first byte is the type */ if (in[0] != MSG_TYPE_MODIFYDN_REQUEST) throw new DataFormatException("byte[] is not a valid add msg"); int pos = 1; super(in); /* read the dn int pos = decodeHeader(MSG_TYPE_MODIFYDN_REQUEST, in); /* read the newRDN * first calculate the length then construct the string */ int length = 0; int offset = pos; while (in[pos++] != 0) { if (pos > in.length) throw new DataFormatException("byte[] is not a valid add msg"); length++; } try { dn = new String(in, offset, length, "UTF-8"); int length = getNextLength(in, pos); newRDN = new String(in, pos, length, "UTF-8"); pos += length + 1; /* read the changeNumber * it is always 24 characters long */ String changenumberStr = new String(in, pos, 24, "UTF-8"); changeNumber = new ChangeNumber(changenumberStr); pos +=24; /* read the newSuperior * first calculate the length then construct the string */ length = getNextLength(in, pos); if (length != 0) newSuperior = new String(in, pos, length, "UTF-8"); else newSuperior = null; pos += length + 1; /* read the newRDN * first calculate the length then construct the string */ length = 0; offset = pos; while (in[pos++] != 0) { if (pos > in.length) throw new DataFormatException("byte[] is not a valid add msg"); length++; } newRDN = new String(in, offset, length, "UTF-8"); /* read the new parent Id */ length = getNextLength(in, pos); if (length != 0) newSuperiorId = new String(in, pos, length, "UTF-8"); else newSuperiorId = null; pos += length + 1; /* read the newSuperior * first calculate the length then construct the string */ length = 0; offset = pos; while (in[pos++] != 0) { if (pos > in.length) throw new DataFormatException("byte[] is not a valid add msg"); length++; } if (length != 0) newSuperior = new String(in, offset, length, "UTF-8"); else newSuperior = null; /* get the deleteoldrdn flag */ if (in[pos] == 0) deleteOldRdn = false; else deleteOldRdn = true; } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } /* get the deleteoldrdn flag */ if (in[pos] == 0) deleteOldRdn = false; else deleteOldRdn = true; } /** * Create an operation from this ModifyDN message. * @param connection the connection to use when creating the operation * @return the created operation * {@inheritDoc} */ @Override public Operation createOperation(InternalClientConnection connection) public Operation createOperation(InternalClientConnection connection, String newDn) { ModifyDNOperation moddn = new ModifyDNOperation(connection, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), null, new ASN1OctetString(dn), new ASN1OctetString(newRDN), new ASN1OctetString(newDn), new ASN1OctetString(newRDN), deleteOldRdn, (newSuperior == null ? null : new ASN1OctetString(newSuperior))); moddn.setAttachment(SYNCHRONIZATION, getChangeNumber()); ModifyDnContext ctx = new ModifyDnContext(getChangeNumber(), getUniqueId(), newSuperiorId); moddn.setAttachment(SYNCHROCONTEXT, ctx); return moddn; } @@ -167,16 +144,12 @@ { try { byte[] byteDn = dn.getBytes("UTF-8"); byte[] byteNewRdn = newRDN.getBytes("UTF-8"); byte[] byteNewSuperior = null; byte[] byteNewSuperiorId = null; /* The Modify DN message is stored in the form : * <operation type><dn><changenumber><newrdn><newsuperior><deleteoldrdn> * the length of result byte array is therefore : * 1 + dn length+1 + 24 + newrdn length+1 + newsuperior length+1 +1 */ int length = 1 + byteDn.length + 1 + 24 + byteNewRdn.length + 1 + 1; // calculate the length necessary to encode the parameters int length = byteNewRdn.length + 1 + 1; if (newSuperior != null) { byteNewSuperior = newSuperior.getBytes("UTF-8"); @@ -185,42 +158,32 @@ else length += 1; byte[] resultByteArray = new byte[length]; int pos = 1; /* put the type of the operation */ resultByteArray[0] = MSG_TYPE_MODIFYDN_REQUEST; /* put the DN and a terminating 0 */ for (int i = 0; i< byteDn.length; i++,pos++) if (newSuperiorId != null) { resultByteArray[pos] = byteDn[i]; byteNewSuperiorId = newSuperiorId.getBytes("UTF-8"); length += byteNewSuperiorId.length + 1; } resultByteArray[pos++] = 0; else length += 1; /* put the ChangeNumber */ byte[] changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8"); for (int i=0; i<24; i++,pos++) { resultByteArray[pos] = changeNumberByte[i]; } byte[] resultByteArray = encodeHeader(MSG_TYPE_MODIFYDN_REQUEST, length); int pos = resultByteArray.length - length; /* put the new RDN and a terminating 0 */ for (int i = 0; i< byteNewRdn.length; i++,pos++) { resultByteArray[pos] = byteNewRdn[i]; } resultByteArray[pos++] = 0; pos = addByteArray(byteNewRdn, resultByteArray, pos); /* put the newsuperior and a terminating 0 */ if (newSuperior != null) { for (int i = 0; i< byteNewSuperior.length; i++,pos++) { resultByteArray[pos] = byteNewSuperior[i]; } pos = addByteArray(byteNewSuperior, resultByteArray, pos); } else resultByteArray[pos++] = 0; /* put the newsuperiorId and a terminating 0 */ if (newSuperiorId != null) { pos = addByteArray(byteNewSuperiorId, resultByteArray, pos); } else resultByteArray[pos++] = 0; @@ -245,7 +208,16 @@ @Override public String toString() { return ("Modify DN " + dn + " " + newRDN + " " + newSuperior + " " + return ("Modify DN " + getDn() + " " + newRDN + " " + newSuperior + " " + getChangeNumber()); } /** * Set the new superior. * @param string the new superior. */ public void setNewSuperior(String string) { newSuperior = string; } } opends/src/server/org/opends/server/synchronization/ModifyDnContext.java
New file @@ -0,0 +1,63 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006 Sun Microsystems, Inc. */ package org.opends.server.synchronization; /** * This class describe the synchronization context that is attached to * ModifyDN operation. */ public class ModifyDnContext extends OperationContext { private String newParentId; /** * Creates a new ModifyDN Context with the provided parameters. * * @param changeNumber The change number of the operation. * @param uid the unique Id of the modified entry. * @param newParentId The unique Identifier of the new parent, * can be null if the entry is to stay below the same * parent. */ public ModifyDnContext(ChangeNumber changeNumber, String uid, String newParentId) { super(changeNumber, uid); this.newParentId = newParentId; } /** * Get the unique Identifier of the new parent. * Can be null if the entry is to stay below the same parent. * * @return Returns the unique Identifier of the new parent.. */ public String getNewParentId() { return newParentId; } } opends/src/server/org/opends/server/synchronization/ModifyFakeOperation.java
@@ -44,17 +44,20 @@ { private ArrayList<Modification> mods = new ArrayList<Modification>(); private DN dn; private String entryuuid; /** * Creates a new ModifyFakeOperation with the provided information. * * @param dn The dn on which the Operation was applied. * @param changenumber The ChangeNumber of the operation. * @param entryuuid The unique ID of the entry on which the Operation applies. */ public ModifyFakeOperation(DN dn, ChangeNumber changenumber) public ModifyFakeOperation(DN dn, ChangeNumber changenumber, String entryuuid) { super(changenumber); this.dn = dn; this.entryuuid = entryuuid; } /** @@ -75,6 +78,6 @@ @Override public SynchronizationMessage generateMessage() { return new ModifyMsg(super.getChangeNumber(), dn, mods); return new ModifyMsg(super.getChangeNumber(), dn, mods, entryuuid); } } opends/src/server/org/opends/server/synchronization/ModifyMsg.java
@@ -26,7 +26,7 @@ */ package org.opends.server.synchronization; import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION; import static org.opends.server.synchronization.OperationContext.*; import org.opends.server.core.ModifyOperation; import org.opends.server.core.Operation; @@ -52,7 +52,6 @@ public class ModifyMsg extends UpdateMessage { private static final long serialVersionUID = -4905520652801395185L; private String dn = null; private byte[] encodedMods = null; private byte[] encodedMsg = null; @@ -63,9 +62,9 @@ */ public ModifyMsg(ModifyOperation op) { dn = op.getRawEntryDN().stringValue(); super((OperationContext) op.getAttachment(OperationContext.SYNCHROCONTEXT), op.getRawEntryDN().stringValue()); encodedMods = modsToByte(op.getModifications()); changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION); } /** @@ -74,12 +73,15 @@ * @param changeNumber The ChangeNumber for the operation. * @param dn The baseDN of the operation. * @param mods The mod of the operation. * @param entryuuid The unique id of the entry on which the modification * needs to apply. */ public ModifyMsg(ChangeNumber changeNumber, DN dn, List<Modification> mods) public ModifyMsg(ChangeNumber changeNumber, DN dn, List<Modification> mods, String entryuuid) { super(new ModifyContext(changeNumber, entryuuid), dn.toNormalizedString()); this.encodedMods = modsToByte(mods); this.dn = dn.toNormalizedString(); this.changeNumber = changeNumber; } /** @@ -92,19 +94,8 @@ public ModifyMsg(byte[] in) throws DataFormatException, UnsupportedEncodingException { super(in); encodedMsg = in; decodeChangeNumber(in); } private void decodeChangeNumber(byte[] in) throws DataFormatException, UnsupportedEncodingException { /* read the changeNumber */ int pos = 1; int length = getNextLength(encodedMsg, pos); String changenumberStr = new String(encodedMsg, pos, length, "UTF-8"); pos += length +1; changeNumber = new ChangeNumber(changenumberStr); } /** @@ -129,12 +120,12 @@ return encodedMsg; } /** * {@inheritDoc} */ @Override public Operation createOperation(InternalClientConnection connection) public Operation createOperation(InternalClientConnection connection, String newDn) throws LDAPException, ASN1Exception, DataFormatException { if (encodedMods == null) @@ -142,6 +133,9 @@ decode(); } if (newDn == null) newDn = getDn(); ArrayList<LDAPModification> ldapmods; ArrayList<ASN1Element> mods = null; @@ -155,8 +149,9 @@ ModifyOperation mod = new ModifyOperation(connection, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), null, new ASN1OctetString(dn), ldapmods); mod.setAttachment(SYNCHRONIZATION, getChangeNumber()); new ASN1OctetString(newDn), ldapmods); ModifyContext ctx = new ModifyContext(getChangeNumber(), getUniqueId()); mod.setAttachment(SYNCHROCONTEXT, ctx); return mod; } @@ -167,30 +162,11 @@ */ private void encode() throws UnsupportedEncodingException { byte[] byteDn = dn.getBytes("UTF-8"); byte[] changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8"); /* encode the header in a byte[] large enough to also contain the mods */ encodedMsg = encodeHeader(MSG_TYPE_MODIFY_REQUEST, encodedMods.length + 1); int pos = encodedMsg.length - (encodedMods.length + 1); /* The Modify message is stored in the form : * <operation type>changenumber><dn><<mods> * the length of result byte array is therefore : * 1 + dn length + 1 + 24 + mods length */ int length = 1 + changeNumberByte.length + 1 + byteDn.length + 1 + encodedMods.length + 1; encodedMsg = new byte[length]; /* put the type of the operation */ encodedMsg[0] = MSG_TYPE_MODIFY_REQUEST; int pos = 1; /* put the ChangeNumber */ pos = addByteArray(changeNumberByte, encodedMsg, pos); /* put the DN and a terminating 0 */ pos = addByteArray(byteDn, encodedMsg, pos); /* put the mods */ /* add the mods */ pos = addByteArray(encodedMods, encodedMsg, pos); } @@ -201,34 +177,14 @@ */ private void decode() throws DataFormatException { /* first byte is the type */ if (encodedMsg[0] != MSG_TYPE_MODIFY_REQUEST) throw new DataFormatException("byte[] is not a valid modify msg"); int pos = decodeHeader(MSG_TYPE_MODIFY_REQUEST, encodedMsg); try /* Read the mods : all the remaining bytes but the terminating 0 */ encodedMods = new byte[encodedMsg.length-pos-1]; int i =0; while (pos<encodedMsg.length-1) { /* read the changeNumber */ int pos = 1; int length = getNextLength(encodedMsg, pos); String changenumberStr = new String(encodedMsg, pos, length, "UTF-8"); pos += length +1; changeNumber = new ChangeNumber(changenumberStr); /* read the dn */ length = getNextLength(encodedMsg, pos); dn = new String(encodedMsg, pos, length, "UTF-8"); pos += length +1; /* Read the mods : all the remaining bytes but the terminating 0 */ encodedMods = new byte[encodedMsg.length-pos-1]; int i =0; while (pos<encodedMsg.length-1) { encodedMods[i++] = encodedMsg[pos++]; } } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); encodedMods[i++] = encodedMsg[pos++]; } } @@ -263,6 +219,6 @@ @Override public String toString() { return("Mod " + dn + " " + getChangeNumber()); return("Modify " + getDn() + " " + getChangeNumber()); } } opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
@@ -72,6 +72,21 @@ new HashMap<DN, SynchronizationDomain>() ; /** * Get the ServerState associated to the SynchronizationDomain * with a given DN. * * @param baseDn The DN of the Synchronization Domain for which the * ServerState must be returned. * @return the ServerState associated to the SynchronizationDomain * with the DN in parameter. */ public static ServerState getServerState(DN baseDn) { SynchronizationDomain domain = findDomain(baseDn); return domain.getServerState(); } /** * {@inheritDoc} */ public void initializeSynchronizationProvider(ConfigEntry configEntry) @@ -245,6 +260,47 @@ return domain.handleConflictResolution(modifyOperation); } /** * {@inheritDoc} */ @Override public SynchronizationProviderResult handleConflictResolution( AddOperation addOperation) throws DirectoryException { SynchronizationDomain domain = findDomain(addOperation.getEntryDN()); if (domain == null) return new SynchronizationProviderResult(true); return domain.handleConflictResolution(addOperation); } /** * {@inheritDoc} */ @Override public SynchronizationProviderResult handleConflictResolution( DeleteOperation deleteOperation) throws DirectoryException { SynchronizationDomain domain = findDomain(deleteOperation.getEntryDN()); if (domain == null) return new SynchronizationProviderResult(true); return domain.handleConflictResolution(deleteOperation); } /** * {@inheritDoc} */ @Override public SynchronizationProviderResult handleConflictResolution( ModifyDNOperation modifyDNOperation) throws DirectoryException { SynchronizationDomain domain = findDomain(modifyDNOperation.getEntryDN()); if (domain == null) return new SynchronizationProviderResult(true); return domain.handleConflictResolution(modifyDNOperation); } /** * {@inheritDoc} @@ -275,57 +331,35 @@ * {@inheritDoc} */ @Override public SynchronizationProviderResult doPreOperation( DeleteOperation deleteOperation) throws DirectoryException { return new SynchronizationProviderResult(true); } /** * {@inheritDoc} */ @Override public SynchronizationProviderResult doPreOperation( ModifyDNOperation modifyDNOperation) throws DirectoryException { return new SynchronizationProviderResult(true); } /** * {@inheritDoc} */ @Override public SynchronizationProviderResult doPreOperation(AddOperation addOperation) { SynchronizationDomain domain = findDomain(addOperation.getEntryDN()); if (domain == null) return new SynchronizationProviderResult(true); domain.setChangeNumber(addOperation); return new SynchronizationProviderResult(true); } if (!addOperation.isSynchronizationOperation()) domain.doPreOperation(addOperation); /** * Pre-operation processing. * Called after operation has been processed by the core server * but before being committed to the backend * Generate the Change number and update the historical information * * @param deleteOperation the current operation * @return code indicating if operation must be processed */ @Override public SynchronizationProviderResult doPreOperation(DeleteOperation deleteOperation) { SynchronizationDomain domain = findDomain(deleteOperation.getEntryDN()); if (domain == null) return new SynchronizationProviderResult(true); domain.setChangeNumber(deleteOperation); return new SynchronizationProviderResult(true); } /** * Pre-operation processing. * Called after operation has been processed by the core server * but before being committed to the backend * Generate the Change number and update the historical information * * @param modifyDNOperation the current operation * @return code indicating if operation must be processed */ @Override public SynchronizationProviderResult doPreOperation(ModifyDNOperation modifyDNOperation) { SynchronizationDomain domain = findDomain(modifyDNOperation.getEntryDN()); if (domain == null) return new SynchronizationProviderResult(true); domain.setChangeNumber(modifyDNOperation); return new SynchronizationProviderResult(true); } @@ -390,21 +424,6 @@ return; } /** * Get the ServerState associated to the SynchronizationDomain * with a given DN. * * @param baseDn The DN of the Synchronization Domain for which the * ServerState must be returned. * @return the ServerState associated to the SynchronizationDomain * with the DN in parameter. */ public static ServerState getServerState(DN baseDn) { SynchronizationDomain domain = findDomain(baseDn); return domain.getServerState(); } } opends/src/server/org/opends/server/synchronization/OperationContext.java
New file @@ -0,0 +1,121 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006 Sun Microsystems, Inc. */ package org.opends.server.synchronization; import org.opends.server.core.Operation; /** * This class describe the Synchronization context that is attached * to each Operation using the SYNCHROCONTEXT key. */ public abstract class OperationContext { /** * The identifier used to attach the context to operations. */ public static final String SYNCHROCONTEXT = "synchronizationContext"; /** * The change Number of the Operation. */ private ChangeNumber changeNumber; /** * The unique Id of the entry that was modified in the original operation. */ private String entryUid; /** * Create a new OperationContext. * @param changeNumber The change number of the operation. * @param uid The unique Identifier of the modified entry. */ protected OperationContext(ChangeNumber changeNumber, String uid) { this.changeNumber = changeNumber; this.entryUid = uid; } /** * Gets The change number of the Operation. * * @return The change number of the Operation. */ public ChangeNumber getChangeNumber() { return changeNumber; } /** * Get the unique Identifier of the modiffied entry. * * @return the unique Identifier of the modiffied entry. */ public String getEntryUid() { return entryUid; } /** * Get the change number of an operation. * * @param op The operation. * @return the hange number of the provided operation. */ public static ChangeNumber getChangeNumber(Operation op) { OperationContext ctx = (OperationContext)op.getAttachment(SYNCHROCONTEXT); return ctx.changeNumber; } /** * {@inheritDoc} */ @Override public boolean equals(Object obj) { if (obj instanceof OperationContext) { OperationContext ctx = (OperationContext) obj; return ((this.changeNumber.equals(ctx.getChangeNumber()) && (this.entryUid.equals(ctx.getEntryUid())))); } else return false; } /** * {@inheritDoc} */ @Override public int hashCode() { return changeNumber.hashCode() + entryUid.hashCode(); } } opends/src/server/org/opends/server/synchronization/SynchMessages.java
@@ -36,15 +36,10 @@ public class SynchMessages { /** * name of Synchronization. */ public static final String SYNCHRONIZATION = "synchronization"; /** * Name used to store attachment of historical information in the * operation. */ public static final String HISTORICAL = "historical"; public static final String HISTORICAL = "ds-synch-historical"; /** * Invalid DN. @@ -256,6 +251,12 @@ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 33; /** * Exception while receiving a message. */ public static final int MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE = CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 34; /** * Register the messages from this class in the core server. * */ @@ -288,7 +289,7 @@ "Changelog failed to start " + "because the database %s could not be read"); MessageHandler.registerMessage(MSGID_EXCEPTION_REPLAYING_OPERATION, "Caught Exception %s when replaying operation %s : %s"); "An Exception was caught while replaying operation %s : %s"); MessageHandler.registerMessage(MSGID_NEED_CHANGELOG_PORT, "The Changelog server port must be defined"); MessageHandler.registerMessage(MSGID_ERROR_UPDATING_RUV, @@ -341,5 +342,8 @@ MessageHandler.registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ACK, "An unexpected error happened sending an ack to %s." + "This connection is going to be closed. "); MessageHandler.registerMessage( MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE, "An Exception was caught while receiving synchronization message : %s"); } } opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
@@ -26,15 +26,21 @@ */ package org.opends.server.synchronization; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.opends.server.util.TimeThread.getTime; import static org.opends.server.synchronization.SynchMessages.*; import static org.opends.server.loggers.Error.*; import static org.opends.server.messages.MessageHandler.*; import static org.opends.server.synchronization.OperationContext.SYNCHROCONTEXT; import static org.opends.server.synchronization.Historical.*; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; import java.util.zip.DataFormatException; import org.opends.server.api.ConfigurableComponent; import org.opends.server.api.DirectoryThread; @@ -47,17 +53,27 @@ import org.opends.server.config.StringConfigAttribute; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryException; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyDNOperation; import org.opends.server.core.ModifyOperation; import org.opends.server.core.Operation; import org.opends.server.messages.MessageHandler; import org.opends.server.protocols.asn1.ASN1Exception; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.LDAPException; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.DN; import org.opends.server.types.DereferencePolicy; import org.opends.server.types.Entry; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.RDN; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.SearchScope; import org.opends.server.types.SynchronizationProviderResult; /** @@ -88,9 +104,6 @@ private ServerState state; private int numReplayedPostOpCalled = 0; private boolean assuredFlag = false; private int maxReceiveQueue = 0; private int maxSendQueue = 0; private int maxReceiveDelay = 0; @@ -114,6 +127,8 @@ private DN configDn; private InternalClientConnection conn = new InternalClientConnection(); static String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; static String BASE_DN_ATTR = "ds-cfg-synchronization-dn"; static String SERVER_ID_ATTR = "ds-cfg-directory-server-id"; @@ -379,8 +394,7 @@ broker.restartReceive(); for (int i=0; i<listenerThreadNumber; i++) { ListenerThread myThread = new ListenerThread(this, changeNumberGenerator); ListenerThread myThread = new ListenerThread(this); myThread.start(); synchroThreads.add(myThread); } @@ -416,6 +430,155 @@ } /** * Implement the handleConflictResolution phase of the deleteOperation. * * @param deleteOperation The deleteOperation. * @return A SynchronizationProviderResult indicating if the operation * can continue. */ public SynchronizationProviderResult handleConflictResolution( DeleteOperation deleteOperation) { DeleteContext ctx = (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT); Entry deletedEntry = deleteOperation.getEntryToDelete(); if (ctx != null) { /* * This is a synchronization operation * Check that the modified entry has the same entryuuid * has was in the original message. */ String operationEntryUUID = ctx.getEntryUid(); String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry); if (!operationEntryUUID.equals(modifiedEntryUUID)) { /* * The changes entry is not the same entry as the one on * the original change was performed. * Probably the original entry was renamed and replaced with * another entry. * We must not let the change proceed, return a negative * result and set the result code to NO_SUCH_OBJET. * When the operation will return, the thread that started the * operation will try to find the correct entry and restart a new * operation. */ deleteOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); return new SynchronizationProviderResult(false); } } else { // There is no Synchronization context attached to the operation // so this is not a synchronization operation. ChangeNumber changeNumber = generateChangeNumber(deleteOperation); String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry); ctx = new DeleteContext(changeNumber, modifiedEntryUUID); deleteOperation.setAttachment(SYNCHROCONTEXT, ctx); } return new SynchronizationProviderResult(true); } /** * Implement the handleConflictResolution phase of the addOperation. * * @param addOperation The AddOperation. * @return A SynchronizationProviderResult indicating if the operation * can continue. */ public SynchronizationProviderResult handleConflictResolution( AddOperation addOperation) { if (addOperation.isSynchronizationOperation()) { AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT); /* * If an entry with the same entry uniqueID already exist then * this operation has already been replayed in the past. */ String uuid = ctx.getEntryUid(); if (findEntryDN(uuid) != null) { addOperation.setResultCode(ResultCode.SUCCESS); return new SynchronizationProviderResult(false); } } return new SynchronizationProviderResult(true); } /** * Implement the handleConflictResolution phase of the ModifyDNOperation. * * @param modifyDNOperation The ModifyDNOperation. * @return A SynchronizationProviderResult indicating if the operation * can continue. */ public SynchronizationProviderResult handleConflictResolution( ModifyDNOperation modifyDNOperation) { ModifyDnContext ctx = (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT); if (ctx != null) { /* * This is a synchronization operation * Check that the modified entry has the same entryuuid * as was in the original message. */ String modifiedEntryUUID = Historical.getEntryUuid(modifyDNOperation.getOriginalEntry()); if (!modifiedEntryUUID.equals(ctx.getEntryUid())) { /* * The modified entry is not the same entry as the one on * the original change was performed. * Probably the original entry was renamed and replaced with * another entry. * We must not let the change proceed, return a negative * result and set the result code to NO_SUCH_OBJET. * When the operation will return, the thread that started the * operation will try to find the correct entry and restart a new * operation. */ modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); return new SynchronizationProviderResult(false); } if (modifyDNOperation.getNewSuperior() != null) { /* * Also check that the current id of the * parent is the same as when the operation was performed. */ String newParentId = findEntryId(modifyDNOperation.getNewSuperior()); if (!newParentId.equals(ctx.getNewParentId())) { modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); return new SynchronizationProviderResult(false); } } } else { // There is no Synchronization context attached to the operation // so this is not a synchronization operation. ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation); String newParentId = null; if (modifyDNOperation.getNewSuperior() != null) { newParentId = findEntryId(modifyDNOperation.getNewSuperior()); } Entry modifiedEntry = modifyDNOperation.getOriginalEntry(); String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry); ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId); modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx); } return new SynchronizationProviderResult(true); } /** * Handle the conflict resolution. * Called by the core server after locking the entry and before * starting the actual modification. @@ -425,25 +588,43 @@ public SynchronizationProviderResult handleConflictResolution( ModifyOperation modifyOperation) { // If operation do not yet have a change number, generate it ChangeNumber changeNumber = (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION); if (changeNumber == null) { synchronized(pendingChanges) { changeNumber = changeNumberGenerator.NewChangeNumber(); pendingChanges.put(changeNumber, new PendingChange(changeNumber, modifyOperation, null)); } modifyOperation.setAttachment(SYNCHRONIZATION, changeNumber); } ModifyContext ctx = (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT); // if Operation is a synchronization operation, solve conflicts if (modifyOperation.isSynchronizationOperation()) Entry modifiedEntry = modifyOperation.getModifiedEntry(); if (ctx == null) { Entry modifiedEntry = modifyOperation.getModifiedEntry(); // There is no Synchronization context attached to the operation // so this is not a synchronization operation. ChangeNumber changeNumber = generateChangeNumber(modifyOperation); String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry); ctx = new ModifyContext(changeNumber, modifiedEntryUUID); modifyOperation.setAttachment(SYNCHROCONTEXT, ctx); } else { String modifiedEntryUUID = ctx.getEntryUid(); String currentEntryUUID = Historical.getEntryUuid(modifiedEntry); if (!currentEntryUUID.equals(modifiedEntryUUID)) { /* * The current modified entry is not the same entry as the one on * the original modification was performed. * Probably the original entry was renamed and replaced with * another entry. * We must not let the modification proceed, return a negative * result and set the result code to NO_SUCH_OBJET. * When the operation will return, the thread that started the * operation will try to find the correct entry and restart a new * operation. */ modifyOperation.setResultCode(ResultCode.NO_SUCH_OBJECT); return new SynchronizationProviderResult(false); } /* * Solve the conflicts between modify operations */ Historical historicalInformation = Historical.load(modifiedEntry); modifyOperation.setAttachment(HISTORICAL, historicalInformation); @@ -456,16 +637,29 @@ * stop the processing and send an OK result */ modifyOperation.setResultCode(ResultCode.SUCCESS); /* * TODO : check that post operation do get called and * that pendingChanges do get updated */ return new SynchronizationProviderResult(false); } } return new SynchronizationProviderResult(true); } /** * The preOperation phase for the add Operation. * Its job is to generate the Synchronization context associated to the * operation. It is necessary to do it in this phase because contrary to * the other operations, the entry uid is not set when the handleConflict * phase is called. * * @param addOperation The Add Operation. */ public void doPreOperation(AddOperation addOperation) { AddContext ctx = new AddContext(generateChangeNumber(addOperation), Historical.getEntryUuid(addOperation), findEntryId(addOperation.getEntryDN().getParent())); addOperation.setAttachment(SYNCHROCONTEXT, ctx); } /** * Receive an update message from the changelog. @@ -494,6 +688,7 @@ /** * Do the necessary processing when an UpdateMessage was received. * * @param update The received UpdateMessage. */ public void receiveUpdate(UpdateMessage update) @@ -548,49 +743,20 @@ { numReplayedPostOpCalled++; UpdateMessage msg = null; ChangeNumber curChangeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION); ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op); if (op.getResultCode() != ResultCode.SUCCESS) { if (curChangeNumber != null) { /* * This code can be executed by multiple threads * Since TreeMap is not synchronized, it is mandatory to synchronize * it now. */ synchronized (pendingChanges) { pendingChanges.remove(curChangeNumber); } } return; } ResultCode result = op.getResultCode(); boolean isAssured = isAssured(op); if (!op.isSynchronizationOperation()) if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation())) { switch (op.getOperationType()) msg = UpdateMessage.generateMsg(op, isAssured); if (msg == null) { case MODIFY : msg = new ModifyMsg((ModifyOperation) op); break; case ADD: msg = new AddMsg((AddOperation) op); break; case DELETE : msg = new DeleteMsg((DeleteOperation) op); break; case MODIFY_DN : msg = new ModifyDNMsg((ModifyDNOperation) op); break; default : /* * This is an operation type that we do not know about * It should never happen * This code can be executed by multiple threads * Since TreeMap is not synchronized, it is mandatory to synchronize * it now. * It should never happen. */ synchronized (pendingChanges) { @@ -603,68 +769,44 @@ return; } } if (isAssured(op)) { msg.setAssured(); } } synchronized(pendingChanges) { PendingChange curChange = pendingChanges.get(curChangeNumber); if (curChange == null) if (result == ResultCode.SUCCESS) { // This should never happen int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING; String message = getMessage(msgID, curChangeNumber.toString(), op.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); return; } curChange.setCommitted(true); if (op.isSynchronizationOperation()) curChange.setOp(op); else curChange.setMsg(msg); ChangeNumber firstChangeNumber = pendingChanges.firstKey(); PendingChange firstChange = pendingChanges.get(firstChangeNumber); ChangeNumber lastCommittedChangeNumber = null; if (!op.isSynchronizationOperation() && msg.isAssured()) { waitingAckMsgs.put(curChangeNumber, msg); } while ((firstChange != null) && firstChange.isCommitted()) { if (firstChange.getOp().isSynchronizationOperation() == false) PendingChange curChange = pendingChanges.get(curChangeNumber); if (curChange == null) { numSentUpdates++; broker.publish(firstChange.getMsg()); // This should never happen int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING; String message = getMessage(msgID, curChangeNumber.toString(), op.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); return; } curChange.setCommitted(true); lastCommittedChangeNumber = firstChange.getChangeNumber(); pendingChanges.remove(lastCommittedChangeNumber); if (pendingChanges.isEmpty()) { firstChange = null; } if (op.isSynchronizationOperation()) curChange.setOp(op); else curChange.setMsg(msg); if (!op.isSynchronizationOperation() && isAssured && (msg != null)) { firstChangeNumber = pendingChanges.firstKey(); firstChange = pendingChanges.get(firstChangeNumber); waitingAckMsgs.put(curChangeNumber, msg); } } if (lastCommittedChangeNumber != null) state.update(lastCommittedChangeNumber); else if (!op.isSynchronizationOperation()) pendingChanges.remove(curChangeNumber); pushCommittedChanges(); } if (!op.isSynchronizationOperation() && msg.isAssured()) if ((!op.isSynchronizationOperation()) && msg.isAssured() && (msg != null) && (result == ResultCode.SUCCESS)) { synchronized (msg) { @@ -683,19 +825,6 @@ } /** * Check if an operation must be processed as an assured operation. * * @param op the operation to be checked. * @return true if the operations must be processed as an assured operation. */ private boolean isAssured(Operation op) { // TODO : should have a filtering mechanism for checking // operation that are assured and operations that are not. return assuredFlag; } /** * get the number of updates received by the synchronization plugin. * * @return the number of updates received @@ -790,27 +919,6 @@ } /** * Generate and set the ChangeNumber of a given Operation. * * @param operation The Operation for which the ChangeNumber must be set. */ public void setChangeNumber(Operation operation) { ChangeNumber changeNumber = (ChangeNumber) operation.getAttachment(SYNCHRONIZATION); if (changeNumber == null) { synchronized(pendingChanges) { changeNumber = changeNumberGenerator.NewChangeNumber(); pendingChanges.put(changeNumber, new PendingChange(changeNumber, operation, null)); } operation.setAttachment(SYNCHRONIZATION, changeNumber); } } /** * {@inheritDoc} */ @Override @@ -826,7 +934,7 @@ synchroThreads = new ArrayList<ListenerThread>(); for (int i=0; i<10; i++) { ListenerThread myThread = new ListenerThread(this, changeNumberGenerator); ListenerThread myThread = new ListenerThread(this); myThread.start(); synchroThreads.add(myThread); } @@ -913,6 +1021,126 @@ } /** * Create and replay a synchronized Operation from an UpdateMessage. * * @param msg The UpdateMessage to be replayed. */ public void replay(UpdateMessage msg) { Operation op = null; boolean done = false; ChangeNumber changeNumber = null; try { while (!done) { op = msg.createOperation(conn); op.setInternalOperation(true); op.setSynchronizationOperation(true); changeNumber = OperationContext.getChangeNumber(op); if (changeNumber != null) changeNumberGenerator.adjust(changeNumber); op.run(); ResultCode result = op.getResultCode(); if (result != ResultCode.SUCCESS) { if (op instanceof ModifyOperation) { ModifyOperation newOp = (ModifyOperation) op; done = solveNamingConflict(newOp, msg); } else if (op instanceof DeleteOperation) { DeleteOperation newOp = (DeleteOperation) op; done = solveNamingConflict(newOp, msg); } else if (op instanceof AddOperation) { AddOperation newOp = (AddOperation) op; done = solveNamingConflict(newOp, msg); } else if (op instanceof ModifyDNOperation) { ModifyDNOperation newOp = (ModifyDNOperation) op; done = solveNamingConflict(newOp, msg); } else { done = true; // unknown type of operation ?! } } else { done = true; } } } catch (ASN1Exception e) { int msgID = MSGID_EXCEPTION_DECODING_OPERATION; String message = getMessage(msgID, msg) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } catch (LDAPException e) { int msgID = MSGID_EXCEPTION_DECODING_OPERATION; String message = getMessage(msgID, msg) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } catch (DataFormatException e) { int msgID = MSGID_EXCEPTION_DECODING_OPERATION; String message = getMessage(msgID, msg) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } catch (Exception e) { if (changeNumber != null) { /* * An Exception happened during the replay process. * Continue with the next change but the servers will know start * to be inconsistent. * TODO : REPAIR : Should let the repair tool know about this */ int msgID = MSGID_EXCEPTION_REPLAYING_OPERATION; String message = getMessage(msgID, stackTraceToSingleLineString(e), op.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); updateError(changeNumber); } else { int msgID = MSGID_EXCEPTION_DECODING_OPERATION; String message = getMessage(msgID, stackTraceToSingleLineString(e), msg.toString()); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); } } finally { if (msg.isAssured()) ack(msg.getChangeNumber()); incProcessedUpdates(); } } /** * This methods is called when an error happends while replaying * and operation. * It is necessary because the postOPeration does not always get @@ -925,6 +1153,434 @@ synchronized (pendingChanges) { pendingChanges.remove(changeNumber); pushCommittedChanges(); } } /** * Generate a new change number and insert it in the pending list. * * @param operation The operation for which the change number must be * generated. * @return The new change number. */ private ChangeNumber generateChangeNumber(Operation operation) { ChangeNumber changeNumber; synchronized(pendingChanges) { changeNumber = changeNumberGenerator.NewChangeNumber(); pendingChanges.put(changeNumber, new PendingChange(changeNumber, operation, null)); } return changeNumber; } /** * Find the Unique Id of the entry with the provided DN by doing a * search of the entry and extracting its uniqueID from its attributes. * * @param dn The dn of the entry for which the unique Id is searched. * * @return The unique Id of the entry whith the provided DN. */ private String findEntryId(DN dn) { if (dn == null) return null; try { LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); attrs.add(ENTRYUIDNAME); InternalSearchOperation search = conn.processSearch(dn, SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, SearchFilter.createFilterFromString("objectclass=*"), attrs); if (search.getResultCode() == ResultCode.SUCCESS) { LinkedList<SearchResultEntry> result = search.getSearchEntries(); if (!result.isEmpty()) { SearchResultEntry resultEntry = result.getFirst(); if (resultEntry != null) { return Historical.getEntryUuid(resultEntry); } } } } catch (DirectoryException e) { // never happens because the filter is always valid. } return null; } /** * find the current dn of an entry from its entry uuid. * * @param uuid the Entry Unique ID. * @return The curernt dn of the entry or null if there is no entry with * the specified uuid. */ private DN findEntryDN(String uuid) { try { InternalSearchOperation search = conn.processSearch(baseDN, SearchScope.WHOLE_SUBTREE, SearchFilter.createFilterFromString("entryuuid="+uuid)); if (search.getResultCode() == ResultCode.SUCCESS) { LinkedList<SearchResultEntry> result = search.getSearchEntries(); if (!result.isEmpty()) { SearchResultEntry resultEntry = result.getFirst(); if (resultEntry != null) { return resultEntry.getDN(); } } } } catch (DirectoryException e) { // never happens because the filter is always valid. } return null; } /** * Solve a conflict detected when replaying a modify operation. * * @param op The operation that triggered the conflict detection. * @param msg The operation that triggered the conflict detection. * @return true if the process is completed, false if it must continue.. */ private boolean solveNamingConflict(ModifyOperation op, UpdateMessage msg) { ResultCode result = op.getResultCode(); ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT); String entryUid = ctx.getEntryUid(); if (result == ResultCode.NO_SUCH_OBJECT) { /* * This error may happen the operation is a modification but * the entry had been renamed on a different master in the same time. * search if the entry has been renamed, and return the new dn * of the entry. */ msg.setDn(findEntryDN(entryUid).toString()); return false; } return true; } /** Solve a conflict detected when replaying a delete operation. * * @param op The operation that triggered the conflict detection. * @param msg The operation that triggered the conflict detection. * @return true if the process is completed, false if it must continue.. */ private boolean solveNamingConflict(DeleteOperation op, UpdateMessage msg) { ResultCode result = op.getResultCode(); DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT); String entryUid = ctx.getEntryUid(); if (result == ResultCode.NO_SUCH_OBJECT) { /* * Find if the entry is still in the database. */ DN currentDn = findEntryDN(entryUid); if (currentDn == null) { /* * The entry has already been deleted, either because this delete * has already been replayed or because another concurrent delete * has already done the job. * In any case, there is is nothing more to do. */ return true; } else { /* * This entry has been renamed, replay the delete using its new DN. */ msg.setDn(currentDn.toString()); return false; } } else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF) { /* * This may happen when we replay a DELETE done on a master * but children of this entry have been added on another master. */ /* * TODO : either delete all the childs or rename the child below * the top suffix by adding entryuuid in dn and delete this entry. */ } return true; } /** * Solve a conflict detected when replaying a ADD operation. * * @param op The operation that triggered the conflict detection. * @param msg The operation that triggered the conflict detection. * @return true if the process is completed, false if it must continue. * @throws Exception When the operation is not valid. */ private boolean solveNamingConflict(AddOperation op, UpdateMessage msg) throws Exception { ResultCode result = op.getResultCode(); AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT); String entryUid = ctx.getEntryUid(); String parentUniqueId = ctx.getParentUid(); if (result == ResultCode.NO_SUCH_OBJECT) { /* * This can happen if the parent has been renamed or deleted * find the parent dn and calculate a new dn for the entry */ if (parentUniqueId == null) { /* * This entry is the base dn of the backend. * It is quite weird that the operation result be NO_SUCH_OBJECT. * There is notthing more we can do except TODO log a * message for the repair tool to look at this problem. */ return true; } DN parentDn = findEntryDN(parentUniqueId); if (parentDn == null) { /* * The parent has been deleted, so this entry should not * exist don't do the ADD. */ return true; } else { RDN entryRdn = op.getEntryDN().getRDN(); msg.setDn(parentDn + "," + entryRdn); return false; } } else if (result == ResultCode.ENTRY_ALREADY_EXISTS) { /* * This can happen if * - two adds are done on different servers but with the * same target DN. * - the same ADD is being replayed for the second time on this server. * if the nsunique ID already exist, assume this is a replay and * don't do anything * if the entry unique id do not exist, generate conflict. */ if (findEntryDN(entryUid) != null) { // entry already exist : this is a replay return true; } else { addConflict(op); msg.setDn(generateConflictDn(entryUid, msg.getDn())); return false; } } return true; } /** * Solve a conflict detected when replaying a Modify DN operation. * * @param op The operation that triggered the conflict detection. * @param msg The operation that triggered the conflict detection. * @return true if the process is completed, false if it must continue. * @throws Exception When the operation is not valid. */ private boolean solveNamingConflict(ModifyDNOperation op, UpdateMessage msg) throws Exception { ResultCode result = op.getResultCode(); ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT); String entryUid = ctx.getEntryUid(); String newSuperiorID = ctx.getNewParentId(); if (result == ResultCode.NO_SUCH_OBJECT) { ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg; /* * four possible cases : * - the modified entry has been renamed * - the new parent has been renamed * - the operation is replayed for the second time. * - the entry has been deleted * action : * - change the target dn and the new parent dn and * restart the operation, * - don't do anything if the operation is replayed. */ // Construct the new DN to use for the entry. DN entryDN = op.getEntryDN(); DN newSuperior = findEntryDN(newSuperiorID); RDN newRDN = op.getNewRDN(); DN parentDN; if (newSuperior == null) { parentDN = entryDN.getParent(); } else { parentDN = newSuperior; } if ((parentDN == null) || parentDN.isNullDN()) { /* this should never happen * can't solve any conflict in this case. */ throw new Exception("operation parameters are invalid"); } RDN[] parentComponents = parentDN.getRDNComponents(); RDN[] newComponents = new RDN[parentComponents.length+1]; System.arraycopy(parentComponents, 0, newComponents, 1, parentComponents.length); newComponents[0] = newRDN; DN newDN = new DN(newComponents); // get the current DN of this entry in the database. DN currentDN = findEntryDN(entryUid); // if the newDN and the current DN match then the operation // is a no-op (this was probably a second replay) // don't do anything. if (newDN.equals(currentDN)) { return true; } msg.setDn(currentDN.toString()); modifyDnMsg.setNewSuperior(newSuperior.toString()); return false; } else if (result == ResultCode.ENTRY_ALREADY_EXISTS) { /* * This may happen when two modifyDn operation * are done on different servers but with the same target DN * add the conflict object class to the entry * and rename it using its entryuuid. */ generateAddConflictOp(op); msg.setDn(generateConflictDn(entryUid, msg.getDn())); return false; } return true; } /** * Generate a modification to add the conflict ObjectClass to an entry * whose Dn is now conflicting with another entry. * * @param op The operation causing the conflict. */ private void generateAddConflictOp(ModifyDNOperation op) { // TODO } /** * Add the conflict object class to an entry that could * not be added because it is conflicting with another entry. * * @param addOp The conflicting Add Operation. */ private void addConflict(AddOperation addOp) { /* * TODO */ } /** * Generate the Dn to use for a conflicting entry. * * @param op Operation that generated the conflict * @param dn Original dn. * @return The generated Dn for a conflicting entry. */ private String generateConflictDn(String entryUid, String dn) { return dn + "entryuuid=" + entryUid; } /** * Check if an operation must be processed as an assured operation. * * @param op the operation to be checked. * @return true if the operations must be processed as an assured operation. */ private boolean isAssured(Operation op) { // TODO : should have a filtering mechanism for checking // operation that are assured and operations that are not. return false; } /** * Push all committed local changes to the changelog service. * PRECONDITION : The pendingChanges lock must be held before calling * this method. */ private void pushCommittedChanges() { if (pendingChanges.isEmpty()) return; ChangeNumber firstChangeNumber = pendingChanges.firstKey(); PendingChange firstChange = pendingChanges.get(firstChangeNumber); while ((firstChange != null) && firstChange.isCommitted()) { if (firstChange.getOp().isSynchronizationOperation() == false) { numSentUpdates++; broker.publish(firstChange.getMsg()); } state.update(firstChangeNumber); pendingChanges.remove(firstChangeNumber); if (pendingChanges.isEmpty()) { firstChange = null; } else { firstChangeNumber = pendingChanges.firstKey(); firstChange = pendingChanges.get(firstChangeNumber); } } } } opends/src/server/org/opends/server/synchronization/UpdateMessage.java
@@ -27,8 +27,13 @@ package org.opends.server.synchronization; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.zip.DataFormatException; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.ModifyDNOperation; import org.opends.server.core.ModifyOperation; import org.opends.server.core.Operation; import org.opends.server.protocols.asn1.ASN1Exception; import org.opends.server.protocols.internal.InternalClientConnection; @@ -45,7 +50,12 @@ /** * The ChangeNumber of this update. */ protected ChangeNumber changeNumber; private ChangeNumber changeNumber; /** * The DN on which the update was originally done. */ private String dn = null; /** * True when the update must use assured replication. @@ -53,6 +63,83 @@ private boolean assuredFlag = false; /** * The UniqueId of the entry that was updated. */ private String UniqueId; /** * Creates a new UpdateMessage with the given informations. * * @param ctx The Synchronization Context of the operation for which the * update message must be created,. * @param dn The dn of the entry on which the change * that caused the creation of this object happened */ public UpdateMessage(OperationContext ctx, String dn) { this.changeNumber = ctx.getChangeNumber(); this.UniqueId = ctx.getEntryUid(); this.dn = dn; } /** * Creates a new UpdateMessage from an ecoded byte array. * * @param in The encoded byte array containind the UpdateMessage. * @throws DataFormatException if the encoded byte array is not valid. * @throws UnsupportedEncodingException if UTF-8 is not supprted. */ public UpdateMessage(byte[] in) throws DataFormatException, UnsupportedEncodingException { /* read the changeNumber */ int pos = 1; int length = getNextLength(in, pos); String changenumberStr = new String(in, pos, length, "UTF-8"); this.changeNumber = new ChangeNumber(changenumberStr); } /** * Generates an Update Message which the provided information. * * @param op The operation fo which the message must be created. * @param isAssured flag indicating if the operation is an assured operation. * @return The generated message. */ public static UpdateMessage generateMsg(Operation op, boolean isAssured) { UpdateMessage msg = null; switch (op.getOperationType()) { case MODIFY : msg = new ModifyMsg((ModifyOperation) op); if (isAssured) msg.setAssured(); break; case ADD: msg = new AddMsg((AddOperation) op); if (isAssured) msg.setAssured(); break; case DELETE : msg = new DeleteMsg((DeleteOperation) op); if (isAssured) msg.setAssured(); break; case MODIFY_DN : msg = new ModifyDNMsg((ModifyDNOperation) op); if (isAssured) msg.setAssured(); break; } return msg; } /** * Get the ChangeNumber from the message. * @return the ChangeNumber */ @@ -62,6 +149,35 @@ } /** * Get the DN on which the operation happened. * * @return The DN on which the operations happened. */ public String getDn() { return dn; } /** * Set the DN. * @param dn The dn that must now be used for this message. */ public void setDn(String dn) { this.dn = dn; } /** * Get the Unique Identifier of the entry on which the operation happened. * * @return The Unique Identifier of the entry on which the operation happened. */ public String getUniqueId() { return UniqueId; } /** * Get a boolean indicating if the Update must be processed as an * Asynchronous or as an assured synchronization. * @@ -117,10 +233,115 @@ * @throws ASN1Exception In case of ASN1 decoding exception. * @throws DataFormatException In case of bad msg format. */ public abstract Operation createOperation(InternalClientConnection conn) public Operation createOperation(InternalClientConnection conn) throws LDAPException, ASN1Exception, DataFormatException { return createOperation(conn, dn); } /** * Create and Operation from the message using the provided DN. * * @param conn connection to use when creating the message. * @param newDn the DN to use when creating the operation. * @return the created Operation. * @throws LDAPException In case of LDAP decoding exception. * @throws ASN1Exception In case of ASN1 decoding exception. * @throws DataFormatException In case of bad msg format. */ public abstract Operation createOperation(InternalClientConnection conn, String newDn) throws LDAPException, ASN1Exception, DataFormatException; /** * Encode the common header for all the UpdateMessage. * * @param type the type of UpdateMessage to encode. * @param additionalLength additional length needed to encode the remaining * part of the UpdateMessage. * @return a byte array containing the common header and enough space to * encode the reamining bytes of the UpdateMessage as was specified * by the additionalLength. * (byte array length = common header length + additionalLength) * @throws UnsupportedEncodingException if UTF-8 is not supported. */ public byte[] encodeHeader(byte type, int additionalLength) throws UnsupportedEncodingException { byte[] byteDn = dn.getBytes("UTF-8"); byte[] changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8"); byte[] byteEntryuuid = getUniqueId().getBytes("UTF-8"); /* The message header is stored in the form : * <operation type>changenumber><dn><entryuuid><change> * the length of result byte array is therefore : * 1 + dn length + 1 + 24 + additional_length */ int length = 1 + changeNumberByte.length + 1 + byteDn.length + 1 + byteEntryuuid.length + 1 + additionalLength; byte[] encodedMsg = new byte[length]; /* put the type of the operation */ encodedMsg[0] = type; int pos = 1; /* put the ChangeNumber */ pos = addByteArray(changeNumberByte, encodedMsg, pos); /* put the DN and a terminating 0 */ pos = addByteArray(byteDn, encodedMsg, pos); /* put the entry uuid and a terminating 0 */ pos = addByteArray(byteEntryuuid, encodedMsg, pos); return encodedMsg; } /** * Decode the Header part of this Update Message, and check its type. * * @param type The type of this Update Message. * @param encodedMsg the encoded form of the UpdateMessage. * @return the position at which the remaining part of the message starts. * @throws DataFormatException if the encodedMsg does not contain a valid * common header. */ public int decodeHeader(byte type, byte [] encodedMsg) throws DataFormatException { /* first byte is the type */ if (encodedMsg[0] != type) throw new DataFormatException("byte[] is not a valid msg"); try { /* read the changeNumber */ int pos = 1; int length = getNextLength(encodedMsg, pos); String changenumberStr = new String(encodedMsg, pos, length, "UTF-8"); pos += length + 1; changeNumber = new ChangeNumber(changenumberStr); /* read the dn */ length = getNextLength(encodedMsg, pos); dn = new String(encodedMsg, pos, length, "UTF-8"); pos += length + 1; /* read the entryuuid */ length = getNextLength(encodedMsg, pos); UniqueId = new String(encodedMsg, pos, length, "UTF-8"); pos += length + 1; return pos; } catch (UnsupportedEncodingException e) { throw new DataFormatException("UTF-8 is not supported by this jvm."); } } /** * {@inheritDoc} */ opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ModifyConflictTest.java
@@ -36,6 +36,8 @@ import org.testng.annotations.Test; import static org.testng.Assert.*; import static org.opends.server.synchronization.OperationContext.*; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyOperation; import org.opends.server.protocols.internal.InternalClientConnection; @@ -47,13 +49,12 @@ import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.ObjectClass; import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION; /* * Test the conflict resolution for modify operations * This is still a work in progress. * currently implemented tests * - check that an replace with a smaller csn is ignored * - check that an replace with a smaller csn is ignored * should test : * - conflict with multi-valued attributes * - conflict with single-valued attributes @@ -74,53 +75,53 @@ @Test() public void replaceAndAdd() throws Exception { { /* * Objectclass and DN do not have any impact on the modifty conflict * resolution for the description attribute. * Always use the same values for all these tests. */ DN dn = DN.decode("dc=com"); DN dn = DN.decode("dc=com"); Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>(); ObjectClass org = DirectoryServer.getObjectClass("organization"); objectClasses.put(org, "organization"); /* * start with a new entry with an empty description */ Entry entry = new Entry(dn, objectClasses, null, null); Historical hist = Historical.load(entry); /* * simulate a modify-replace done at time t10 */ testModify(entry, hist, "description", ModificationType.REPLACE, "init value", 10, true); /* * Now simulate an add at an earlier date that the previous replace * conflict resolution should remove it. */ */ testModify(entry, hist, "description", ModificationType.ADD, "older value", 1, false); /* * Now simulate an add at an earlier date that the previous replace * conflict resolution should remove it. * (a second time to make sure...) */ */ testModify(entry, hist, "description", ModificationType.ADD, "older value", 2, false); /* * Now simulate an add at a later date that the previous replace. * conflict resolution should keep it */ */ testModify(entry, hist, "description", ModificationType.ADD, "new value", 11, true); } /** * Test that conflict between a modify-delete-attribute and modify-add * for multi-valued attributes are handled correctly. @@ -128,53 +129,53 @@ @Test() public void deleteAndAdd() throws Exception { { /* * Objectclass and DN do not have any impact on the modifty conflict * resolution for the description attribute. * Always use the same values for all these tests. */ DN dn = DN.decode("dc=com"); DN dn = DN.decode("dc=com"); Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>(); ObjectClass org = DirectoryServer.getObjectClass("organization"); objectClasses.put(org, "organization"); /* * start with a new entry with an empty description */ Entry entry = new Entry(dn, objectClasses, null, null); Historical hist = Historical.load(entry); /* * simulate a delete of the whole description attribute done at time t10 */ testModify(entry, hist, "description", ModificationType.DELETE, null, 10, true); /* * Now simulate an add at an earlier date that the previous delete. * The conflict resolution should detect that this add must be ignored. */ */ testModify(entry, hist, "description", ModificationType.ADD, "older value", 1, false); /* * Now simulate an add at an earlier date that the previous delete. * The conflict resolution should detect that this add must be ignored. * (a second time to make sure that historical information is kept...) */ */ testModify(entry, hist, "description", ModificationType.ADD, "older value", 2, false); /* * Now simulate an add at a later date that the previous delete. * conflict resolution should keep it */ */ testModify(entry, hist, "description", ModificationType.ADD, "new value", 11, true); } /** * Test that conflict between a modify-add and modify-add * for multi-valued attributes are handled correctly. @@ -182,67 +183,67 @@ @Test() public void addAndAdd() throws Exception { { /* * Objectclass and DN do not have any impact on the modifty conflict * resolution for the description attribute. * Always use the same values for all these tests. */ DN dn = DN.decode("dc=com"); DN dn = DN.decode("dc=com"); Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>(); ObjectClass org = DirectoryServer.getObjectClass("organization"); objectClasses.put(org, "organization"); /* * start with a new entry with an empty description */ Entry entry = new Entry(dn, objectClasses, null, null); Historical hist = Historical.load(entry); /* * simulate a add of the description attribute done at time t10 */ testModify(entry, hist, "description", ModificationType.ADD, "init value", 10, true); /* * Now simulate an add at an earlier date that the previous add. * The conflict resolution should detect that this add must be kept. */ testModify(entry, hist, "description", ModificationType.ADD, */ testModify(entry, hist, "description", ModificationType.ADD, "older value", 1, true); /* * Now simulate an add at an earlier date that the previous add. * The conflict resolution should detect that this add must be kept. * (a second time to make sure that historical information is kept...) */ */ testModify(entry, hist, "description", ModificationType.ADD, "older value", 2, false); /* * Now simulate an add at a later date that the previous add. * conflict resolution should keep it */ */ testModify(entry, hist, "description", ModificationType.ADD, "new value", 11, true); "new value", 11, true); } /* * helper function. */ private static void testModify(Entry entry, Historical hist, String attrName, private static void testModify(Entry entry, Historical hist, String attrName, ModificationType modType, String value, int date, boolean keepChangeResult) { InternalClientConnection connection = new InternalClientConnection(); ChangeNumber t = new ChangeNumber(date, (short) 0, (short) 0); /* create AttributeType description that will be usedfor this test */ AttributeType attrType = DirectoryServer.getAttributeType(attrName, true); LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(); if (value != null) values.add(new AttributeValue(attrType, value)); @@ -250,17 +251,17 @@ List<Modification> mods = new ArrayList<Modification>(); Modification mod = new Modification(modType, attr); mods.add(mod); ModifyOperation modOp = new ModifyOperation(connection, 1, 1, null, entry.getDN(), mods); modOp.setAttachment(SYNCHRONIZATION, t); ModifyContext ctx = new ModifyContext(t, "uniqueId"); modOp.setAttachment(SYNCHROCONTEXT, ctx); hist.replayOperation(modOp, entry); /* * The last older change should have been detected as conflicting * and should be removed by the conflict resolution code. * The last older change should have been detected as conflicting * and should be removed by the conflict resolution code. */ if (keepChangeResult) { opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
@@ -51,7 +51,7 @@ import org.opends.server.types.RDN; import org.opends.server.util.TimeThread; import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION; import static org.opends.server.synchronization.OperationContext.*; /** * Test the contructors, encoders and decoders of the synchronization @@ -130,10 +130,10 @@ { DN dn = DN.decode(rawdn); InternalClientConnection connection = new InternalClientConnection(); ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods); ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods, "fakeuniqueid"); ModifyMsg generatedMsg = new ModifyMsg(msg.getBytes()); assertEquals(msg.changeNumber, generatedMsg.changeNumber); assertEquals(msg.getChangeNumber(), generatedMsg.getChangeNumber()); Operation op = msg.createOperation(connection); Operation generatedOperation = generatedMsg.createOperation(connection); @@ -145,6 +145,8 @@ ModifyOperation mod2 = (ModifyOperation) generatedOperation; assertEquals(mod1.getRawEntryDN(), mod2.getRawEntryDN()); assertEquals( mod1.getAttachment(SYNCHROCONTEXT), mod2.getAttachment(SYNCHROCONTEXT)); /* * TODO : test that the generated mod equals the original mod. @@ -178,11 +180,11 @@ DN.decode(rawDN)); ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), (short) 123, (short) 45); op.setAttachment(SYNCHRONIZATION, cn); op.setAttachment(SYNCHROCONTEXT, new DeleteContext(cn, "uniqueid")); DeleteMsg msg = new DeleteMsg(op); DeleteMsg generatedMsg = new DeleteMsg(msg.getBytes()); assertEquals(msg.changeNumber, generatedMsg.changeNumber); assertEquals(msg.getChangeNumber(), generatedMsg.getChangeNumber()); Operation generatedOperation = generatedMsg.createOperation(connection); @@ -219,13 +221,14 @@ ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), (short) 123, (short) 45); op.setAttachment(SYNCHRONIZATION, cn); op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(cn, "uniqueid", "newparentId")); ModifyDNMsg msg = new ModifyDNMsg(op); ModifyDNMsg generatedMsg = new ModifyDNMsg(msg.getBytes()); Operation generatedOperation = generatedMsg.createOperation(connection); ModifyDNOperation mod2 = (ModifyDNOperation) generatedOperation; assertEquals(msg.changeNumber, generatedMsg.changeNumber); assertEquals(msg.getChangeNumber(), generatedMsg.getChangeNumber()); assertEquals(op.getRawEntryDN(), mod2.getRawEntryDN()); assertEquals(op.getRawNewRDN(), mod2.getRawNewRDN()); assertEquals(op.deleteOldRDN(), mod2.deleteOldRDN()); @@ -238,7 +241,7 @@ {"dc=test,dc=com"}, }; } @Test(dataProvider = "addEncodeDecode") public void addEncodeDecode(String rawDN) throws Exception @@ -269,7 +272,8 @@ ChangeNumber cn = new ChangeNumber(TimeThread.getTime(), (short) 123, (short) 45); AddMsg msg = new AddMsg(cn, rawDN, objectClass, userAttributes, AddMsg msg = new AddMsg(cn, rawDN, "thisIsaUniqueID", "parentUniqueId", objectClass, userAttributes, operationalAttributes); AddMsg generatedMsg = new AddMsg(msg.getBytes()); assertEquals(msg.getBytes(), generatedMsg.getBytes());