- pre-operation plugins are not called anymore when processing synchronization operations
for ADD,DELELTE and MODIFYDN as it was already the case for MODIFY operation
This is necessary to make sure that entries use the same unique ID everywhere.
- Synchronization now its own serialization mechanism for synchronization messages
(before it was using java object serialization)
This allows to use the same serialization for data transfer between LDAP servers
and changelog servers and for saving the changes into the changelog database.
It will also allow to make some evolution the protocol without breaking the compatibility
with previous releases.
This also brings some performance improvements as this new serialization mechanism
only needs to be done once on the master that originally processed
the operation while before it was done for each transfer over the wire and for writing
the changelog database.
1 files added
26 files modified
| | |
| | | * Performs any necessary processing that should be done just before |
| | | * the Directory Server performs the core processing for an add |
| | | * operation. |
| | | * This method is not called when processing synchronization |
| | | * operations. |
| | | * |
| | | * @param addOperation The add operation to be processed. |
| | | * |
| | |
| | | * Performs any necessary processing that should be done just before |
| | | * the Directory Server performs the core processing for a delete |
| | | * operation. |
| | | * This method is not called when processing synchronization |
| | | * operations. |
| | | * |
| | | * @param deleteOperation The delete operation to be processed. |
| | | * |
| | |
| | | * the Directory Server performs the core processing for a modify |
| | | * operation. |
| | | * |
| | | * This method is not called when processing synchronization |
| | | * operations. |
| | | * @param modifyOperation The modify operation to be processed. |
| | | * |
| | | * @return Information about the result of the plugin processing. |
| | |
| | | * Performs any necessary processing that should be done just before |
| | | * the Directory Server performs the core processing for a modify DN |
| | | * operation. |
| | | * This method is not called when processing synchronization |
| | | * operations. |
| | | * |
| | | * @param modifyDNOperation The modify DN operation to be |
| | | * processed. |
| | |
| | | { |
| | | newSocket = listenSocket.accept(); |
| | | ServerHandler handler = new ServerHandler( |
| | | new SerializingProtocolSession(newSocket)); |
| | | new SocketSession(newSocket)); |
| | | handler.start(null); |
| | | } catch (IOException e) |
| | | { |
| | |
| | | socket.connect(ServerAddr, 500); |
| | | |
| | | ServerHandler handler = new ServerHandler( |
| | | new SerializingProtocolSession(socket)); |
| | | new SocketSession(socket)); |
| | | handler.start(baseDn); |
| | | } |
| | | catch (IOException e) |
| | |
| | | * There is not much more that we can do at this point except trying |
| | | * to continue with the next record. |
| | | * In such case, it is therefore possible that we miss some changes. |
| | | * TODO. log an error message. |
| | | * TODO. Such problem should be handled by the repair functionality. |
| | | */ |
| | | } |
| | |
| | | package org.opends.server.changelog; |
| | | |
| | | import com.sleepycat.je.DatabaseEntry; |
| | | import org.opends.server.synchronization.AddMsg; |
| | | import org.opends.server.synchronization.DeleteMsg; |
| | | import org.opends.server.synchronization.ModifyDNMsg; |
| | | import org.opends.server.synchronization.ModifyMsg; |
| | | import org.opends.server.synchronization.SynchronizationMessage; |
| | | import org.opends.server.synchronization.UpdateMessage; |
| | | |
| | | import static org.opends.server.protocols.ldap.LDAPConstants.*; |
| | | |
| | | /** |
| | | * SuperClass of DatabaseEntry used for data stored in the Changelog Databases. |
| | | */ |
| | |
| | | */ |
| | | public ChangelogData(UpdateMessage change) |
| | | { |
| | | this.setData(change.getByte()); |
| | | this.setData(change.getBytes()); |
| | | } |
| | | |
| | | /** |
| | |
| | | public static UpdateMessage generateChange(byte[] data) |
| | | throws Exception |
| | | { |
| | | UpdateMessage msg = null; |
| | | switch (data[0]) |
| | | { |
| | | case OP_TYPE_MODIFY_REQUEST: |
| | | msg = (UpdateMessage) new ModifyMsg(data); |
| | | break; |
| | | case OP_TYPE_ADD_REQUEST: |
| | | msg = (UpdateMessage) new AddMsg(data); |
| | | break; |
| | | case OP_TYPE_DELETE_REQUEST: |
| | | msg = (UpdateMessage) new DeleteMsg(data); |
| | | break; |
| | | case OP_TYPE_MODIFY_DN_REQUEST: |
| | | msg = (UpdateMessage) new ModifyDNMsg(data); |
| | | break; |
| | | } |
| | | return msg; |
| | | return (UpdateMessage) SynchronizationMessage.generateMsg(data); |
| | | } |
| | | } |
| | |
| | | package org.opends.server.changelog; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.synchronization.SynchronizationMessage; |
| | | |
| | | /** |
| | |
| | | * @throws IOException When error happened durin IO process. |
| | | * @throws ClassNotFoundException When the data received does extend the |
| | | * SynchronizationMessage class. |
| | | * @throws DataFormatException When the data received is not formatted as a |
| | | * SynchronizationMessage. |
| | | */ |
| | | public abstract SynchronizationMessage receive() |
| | | throws IOException, ClassNotFoundException; |
| | | throws IOException, ClassNotFoundException, |
| | | DataFormatException; |
| | | } |
| | |
| | | public String toString() |
| | | { |
| | | String localString; |
| | | if (serverIsLDAPserver) |
| | | localString = "Directory Server "; |
| | | else |
| | | localString = "Changelog Server "; |
| | | if (serverId != 0) |
| | | { |
| | | if (serverIsLDAPserver) |
| | | localString = "Directory Server "; |
| | | else |
| | | localString = "Changelog Server "; |
| | | |
| | | localString += serverId + " " + serverURL + " " + baseDn; |
| | | |
| | | localString += serverId + " " + serverURL + " " + baseDn; |
| | | } |
| | | else |
| | | localString = "Unknown server"; |
| | | |
| | | return localString; |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE |
| | | * or https://OpenDS.dev.java.net/OpenDS.LICENSE. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at |
| | | * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, |
| | | * add the following below this CDDL HEADER, with the fields enclosed |
| | | * by brackets "[]" replaced with your own identifying * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.changelog; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | | import java.net.Socket; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.synchronization.SynchronizationMessage; |
| | | |
| | | /** |
| | | * TODO : should have some versioning in the packets so that |
| | | * the futur versions can evolve while still |
| | | * being able to understand the older versions. |
| | | */ |
| | | public class SocketSession implements ProtocolSession |
| | | { |
| | | private Socket socket; |
| | | private InputStream input; |
| | | private OutputStream output; |
| | | byte[] rcvLengthBuf = new byte[8]; |
| | | |
| | | /** |
| | | * Creates a new SocketSession based on the provided socket. |
| | | * |
| | | * @param socket The Socket on which the SocketSession will be based. |
| | | * @throws IOException When an IException happens on the socket. |
| | | */ |
| | | public SocketSession(Socket socket) throws IOException |
| | | { |
| | | this.socket = socket; |
| | | input = socket.getInputStream(); |
| | | output = socket.getOutputStream(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void close() throws IOException |
| | | { |
| | | socket.close(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public synchronized void publish(SynchronizationMessage msg) |
| | | throws IOException |
| | | { |
| | | byte[] buffer = msg.getBytes(); |
| | | String str = String.format("%08x", buffer.length); |
| | | byte[] sendLengthBuf = str.getBytes(); |
| | | |
| | | output.write(sendLengthBuf); |
| | | output.write(buffer); |
| | | output.flush(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public SynchronizationMessage receive() throws IOException, |
| | | ClassNotFoundException, DataFormatException |
| | | { |
| | | /* Read the first 8 bytes containing the packet length */ |
| | | int length = 0; |
| | | |
| | | while (length<8) |
| | | { |
| | | int read = input.read(rcvLengthBuf, length, 8-length); |
| | | if (read == -1) |
| | | throw new IOException("no more data"); |
| | | else |
| | | length += read; |
| | | } |
| | | |
| | | int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16); |
| | | |
| | | try |
| | | { |
| | | length = 0; |
| | | byte[] buffer = new byte[totalLength]; |
| | | while (length < totalLength) |
| | | length += input.read(buffer, length, totalLength - length); |
| | | return SynchronizationMessage.generateMsg(buffer); |
| | | } |
| | | catch (OutOfMemoryError e) |
| | | { |
| | | throw new IOException("Packet too large, can't allocate " |
| | | + totalLength + " bytes."); |
| | | } |
| | | } |
| | | } |
| | |
| | | List<Attribute> attrList = operationalAttributes.get(t); |
| | | if (attrList == null) |
| | | { |
| | | if (DirectoryServer.addMissingRDNAttributes()) |
| | | if (isSynchronizationOperation() || |
| | | DirectoryServer.addMissingRDNAttributes()) |
| | | { |
| | | LinkedHashSet<AttributeValue> valueList = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | |
| | | |
| | | if (! found) |
| | | { |
| | | if (DirectoryServer.addMissingRDNAttributes()) |
| | | if (isSynchronizationOperation() || |
| | | DirectoryServer.addMissingRDNAttributes()) |
| | | { |
| | | LinkedHashSet<AttributeValue> valueList = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | |
| | | List<Attribute> attrList = userAttributes.get(t); |
| | | if (attrList == null) |
| | | { |
| | | if (DirectoryServer.addMissingRDNAttributes()) |
| | | if (isSynchronizationOperation() || |
| | | DirectoryServer.addMissingRDNAttributes()) |
| | | { |
| | | LinkedHashSet<AttributeValue> valueList = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | |
| | | |
| | | if (! found) |
| | | { |
| | | if (DirectoryServer.addMissingRDNAttributes()) |
| | | if (isSynchronizationOperation() || |
| | | DirectoryServer.addMissingRDNAttributes()) |
| | | { |
| | | LinkedHashSet<AttributeValue> valueList = |
| | | new LinkedHashSet<AttributeValue>(1); |
| | |
| | | } |
| | | |
| | | |
| | | // Invoke the pre-operation add plugins. |
| | | PreOperationPluginResult preOpResult = |
| | | pluginConfigManager.invokePreOperationAddPlugins(this); |
| | | if (preOpResult.connectionTerminated()) |
| | | // If the operation is not a synchronization operation, |
| | | // Invoke the pre-operation modify plugins. |
| | | if (!isSynchronizationOperation()) |
| | | { |
| | | // There's no point in continuing with anything. Log the result |
| | | // and return. |
| | | setResultCode(ResultCode.CANCELED); |
| | | PreOperationPluginResult preOpResult = |
| | | pluginConfigManager.invokePreOperationAddPlugins(this); |
| | | if (preOpResult.connectionTerminated()) |
| | | { |
| | | // There's no point in continuing with anything. Log the result |
| | | // and return. |
| | | setResultCode(ResultCode.CANCELED); |
| | | |
| | | int msgID = MSGID_CANCELED_BY_PREOP_DISCONNECT; |
| | | appendErrorMessage(getMessage(msgID)); |
| | | int msgID = MSGID_CANCELED_BY_PREOP_DISCONNECT; |
| | | appendErrorMessage(getMessage(msgID)); |
| | | |
| | | processingStopTime = System.currentTimeMillis(); |
| | | processingStopTime = System.currentTimeMillis(); |
| | | |
| | | logAddResponse(this); |
| | | return; |
| | | } |
| | | else if (preOpResult.sendResponseImmediately()) |
| | | { |
| | | skipPostOperation = true; |
| | | break addProcessing; |
| | | logAddResponse(this); |
| | | return; |
| | | } |
| | | else if (preOpResult.sendResponseImmediately()) |
| | | { |
| | | skipPostOperation = true; |
| | | break addProcessing; |
| | | } |
| | | } |
| | | |
| | | |
| | |
| | | buffer.append(rawEntryDN); |
| | | buffer.append(")"); |
| | | } |
| | | |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | |
| | | // Invoke the pre-delete plugins. |
| | | PreOperationPluginResult preOpResult = |
| | | pluginConfigManager.invokePreOperationDeletePlugins(this); |
| | | if (preOpResult.connectionTerminated()) |
| | | // If the operation is not a synchronization operation, |
| | | // invoke the pre-delete plugins. |
| | | if (!isSynchronizationOperation()) |
| | | { |
| | | // There's no point in continuing with anything. Log the request and |
| | | // result and return. |
| | | setResultCode(ResultCode.CANCELED); |
| | | PreOperationPluginResult preOpResult = |
| | | pluginConfigManager.invokePreOperationDeletePlugins(this); |
| | | if (preOpResult.connectionTerminated()) |
| | | { |
| | | // There's no point in continuing with anything. Log the request |
| | | // and result and return. |
| | | setResultCode(ResultCode.CANCELED); |
| | | |
| | | int msgID = MSGID_CANCELED_BY_PREOP_DISCONNECT; |
| | | appendErrorMessage(getMessage(msgID)); |
| | | int msgID = MSGID_CANCELED_BY_PREOP_DISCONNECT; |
| | | appendErrorMessage(getMessage(msgID)); |
| | | |
| | | processingStopTime = System.currentTimeMillis(); |
| | | logDeleteResponse(this); |
| | | return; |
| | | } |
| | | else if (preOpResult.sendResponseImmediately()) |
| | | { |
| | | skipPostOperation = true; |
| | | break deleteProcessing; |
| | | processingStopTime = System.currentTimeMillis(); |
| | | logDeleteResponse(this); |
| | | return; |
| | | } |
| | | else if (preOpResult.sendResponseImmediately()) |
| | | { |
| | | skipPostOperation = true; |
| | | break deleteProcessing; |
| | | } |
| | | } |
| | | |
| | | |
| | |
| | | int modCount = modifications.size(); |
| | | |
| | | |
| | | // If the operation is not a synchronization operation, |
| | | // Invoke the pre-operation modify DN plugins. |
| | | PreOperationPluginResult preOpResult = |
| | | pluginConfigManager.invokePreOperationModifyDNPlugins(this); |
| | | if (preOpResult.connectionTerminated()) |
| | | if (!isSynchronizationOperation()) |
| | | { |
| | | // There's no point in continuing with anything. Log the request and |
| | | // result and return. |
| | | setResultCode(ResultCode.CANCELED); |
| | | PreOperationPluginResult preOpResult = |
| | | pluginConfigManager.invokePreOperationModifyDNPlugins(this); |
| | | if (preOpResult.connectionTerminated()) |
| | | { |
| | | // There's no point in continuing with anything. Log the request |
| | | // and result and return. |
| | | setResultCode(ResultCode.CANCELED); |
| | | |
| | | int msgID = MSGID_CANCELED_BY_PREOP_DISCONNECT; |
| | | appendErrorMessage(getMessage(msgID)); |
| | | int msgID = MSGID_CANCELED_BY_PREOP_DISCONNECT; |
| | | appendErrorMessage(getMessage(msgID)); |
| | | |
| | | processingStopTime = System.currentTimeMillis(); |
| | | logModifyDNResponse(this); |
| | | return; |
| | | } |
| | | else if (preOpResult.sendResponseImmediately()) |
| | | { |
| | | skipPostOperation = true; |
| | | break modifyDNProcessing; |
| | | processingStopTime = System.currentTimeMillis(); |
| | | logModifyDNResponse(this); |
| | | return; |
| | | } |
| | | else if (preOpResult.sendResponseImmediately()) |
| | | { |
| | | skipPostOperation = true; |
| | | break modifyDNProcessing; |
| | | } |
| | | } |
| | | |
| | | |
| | |
| | | this.entryDN = entryDN; |
| | | this.modifications = modifications; |
| | | |
| | | rawEntryDN = new ASN1OctetString(rawEntryDN.toString()); |
| | | rawEntryDN = new ASN1OctetString(entryDN.toString()); |
| | | |
| | | rawModifications = new ArrayList<LDAPModification>(modifications.size()); |
| | | for (Modification m : modifications) |
| | |
| | | return; |
| | | } |
| | | |
| | | |
| | | // If the operation is not a synchronization operation, |
| | | // Invoke the pre-operation modify plugins. |
| | | if (!isSynchronizationOperation()) |
| | | { |
| | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import java.io.Serializable; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | /** |
| | | * This Class is used to send acks between LDAP and changelog servers. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Creates a new AckMessage by decoding the provided byte array. |
| | | * |
| | | * @param in The byte array containing the encoded form of the AckMessage. |
| | | * @throws DataFormatException If in does not contain a properly encoded |
| | | * AckMessage. |
| | | */ |
| | | public AckMessage(byte[] in) throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != MSG_TYPE_ACK) |
| | | throw new DataFormatException("byte[] is not a valid modify msg"); |
| | | int pos = 1; |
| | | |
| | | /* read the changeNumber |
| | | * it is always 24 characters long |
| | | */ |
| | | String changenumberStr = new String(in, pos, 24, "UTF-8"); |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | pos +=24; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the ChangeNumber from the message. |
| | | * |
| | | * @return the ChangeNumber |
| | |
| | | { |
| | | return changeNumber; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | |
| | | domain.receiveAck(this); |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public byte[] getBytes() |
| | | { |
| | | try |
| | | { |
| | | int length = 1 + 24; |
| | | byte[] resultByteArray = new byte[length]; |
| | | int pos = 1; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = MSG_TYPE_ACK; |
| | | |
| | | resultByteArray[pos++] = 0; |
| | | /* put the ChangeNumber */ |
| | | byte[] changeNumberByte; |
| | | |
| | | changeNumberByte = this.getChangeNumber().toString().getBytes("UTF-8"); |
| | | |
| | | for (int i=0; i<24; i++,pos++) |
| | | { |
| | | resultByteArray[pos] = changeNumberByte[i]; |
| | | } |
| | | return resultByteArray; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | | } |
| | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.protocols.asn1.ASN1Element; |
| | | import org.opends.server.protocols.asn1.ASN1Exception; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.ldap.LDAPAttribute; |
| | | import org.opends.server.protocols.ldap.LDAPException; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeValue; |
| | | |
| | | import static org.opends.server.synchronization.SynchMessages.*; |
| | | import static org.opends.server.protocols.ldap.LDAPConstants.*; |
| | | import static org.opends.server.util.StaticUtils.toLowerCase; |
| | | |
| | | /** |
| | | * This class is used to exchange Add operation between LDAP servers |
| | |
| | | */ |
| | | public AddMsg(AddOperation op) |
| | | { |
| | | dn = op.getRawEntryDN().stringValue(); |
| | | List<LDAPAttribute> attrs = op.getRawAttributes(); |
| | | ArrayList<ASN1Element> elems = new ArrayList<ASN1Element>(attrs.size()); |
| | | // Encode the object classes (SET OF LDAPString). |
| | | LinkedHashSet<AttributeValue> ocValues = |
| | | new LinkedHashSet<AttributeValue>(op.getObjectClasses().size()); |
| | | for (String s : op.getObjectClasses().values()) |
| | | { |
| | | ocValues.add(new AttributeValue(new ASN1OctetString(s), |
| | | new ASN1OctetString(toLowerCase(s)))); |
| | | } |
| | | |
| | | /* |
| | | * encode each LDAPAttribute into an ASN1Element |
| | | * then add each ASN1Element into an ArrayList, |
| | | * then encode the Arraylist of ASN1Element into a byte[] |
| | | */ |
| | | for (LDAPAttribute attr : attrs) |
| | | elems.add(attr.encode()); |
| | | Attribute attr = new Attribute( |
| | | DirectoryServer.getObjectClassAttributeType(), |
| | | "objectClass", ocValues); |
| | | |
| | | ArrayList<ASN1Element> elems = new ArrayList<ASN1Element>(); |
| | | |
| | | elems.add(new LDAPAttribute(attr).encode()); |
| | | |
| | | // Encode the user attributes (AttributeList). |
| | | for (List<Attribute> list : op.getUserAttributes().values()) |
| | | { |
| | | for (Attribute a : list) |
| | | { |
| | | elems.add(new LDAPAttribute(a).encode()); |
| | | } |
| | | } |
| | | |
| | | // Encode the operational attributes (AttributeList). |
| | | for (List<Attribute> list : op.getOperationalAttributes().values()) |
| | | { |
| | | for (Attribute a : list) |
| | | { |
| | | elems.add(new LDAPAttribute(a).encode()); |
| | | } |
| | | } |
| | | |
| | | dn = op.getRawEntryDN().stringValue(); |
| | | |
| | | // Encode the sequence. |
| | | encodedAttributes = ASN1Element.encodeValue(elems); |
| | | |
| | | changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new AddMessage. |
| | | * |
| | | * @param cn ChangeNumber of the add. |
| | | * @param dn DN of the added entry. |
| | | * @param objectClass objectclass of the added entry. |
| | | * @param userAttributes user attributes of the added entry. |
| | | * @param operationalAttributes operational attributes of the added entry. |
| | | */ |
| | | public AddMsg(ChangeNumber cn, |
| | | String dn, |
| | | Attribute objectClass, |
| | | Collection<Attribute> userAttributes, |
| | | Collection<Attribute> operationalAttributes) |
| | | { |
| | | this.dn = dn; |
| | | this.changeNumber = cn; |
| | | |
| | | ArrayList<ASN1Element> elems = new ArrayList<ASN1Element>(); |
| | | elems.add(new LDAPAttribute(objectClass).encode()); |
| | | |
| | | for (Attribute a : userAttributes) |
| | | elems.add(new LDAPAttribute(a).encode()); |
| | | |
| | | for (Attribute a : operationalAttributes) |
| | | elems.add(new LDAPAttribute(a).encode()); |
| | | |
| | | encodedAttributes = ASN1Element.encodeValue(elems); |
| | | } |
| | | |
| | | /** |
| | | * Creates a new Add message from a byte[]. |
| | | * |
| | | * @param in The byte[] from which the operation must be read. |
| | | * @throws Exception The input byte[] is not a valid AddMsg |
| | | * @throws DataFormatException The input byte[] is not a valid AddMsg |
| | | */ |
| | | public AddMsg(byte[] in) throws Exception |
| | | public AddMsg(byte[] in) throws DataFormatException |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != OP_TYPE_ADD_REQUEST) |
| | | throw new Exception("byte[] is not a valid add msg"); |
| | | if (in[0] != MSG_TYPE_ADD_REQUEST) |
| | | throw new DataFormatException("byte[] is not a valid add msg"); |
| | | int pos = 1; |
| | | |
| | | /* read the dn |
| | |
| | | while (in[pos++] != 0) |
| | | { |
| | | if (pos > in.length) |
| | | throw new Exception("byte[] is not a valid add msg"); |
| | | throw new DataFormatException("byte[] is not a valid add msg"); |
| | | length++; |
| | | } |
| | | dn = new String(in, offset, length, "UTF-8"); |
| | | try |
| | | { |
| | | dn = new String(in, offset, length, "UTF-8"); |
| | | |
| | | /* read the changeNumber |
| | | * it is always 24 characters long |
| | | */ |
| | | String changenumberStr = new String(in, pos, 24, "UTF-8"); |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | pos +=24; |
| | | /* read the changeNumber |
| | | * it is always 24 characters long |
| | | */ |
| | | String changenumberStr = new String(in, pos, 24, "UTF-8"); |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | pos +=24; |
| | | } catch (UnsupportedEncodingException e ) { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | |
| | | /* Read the attributes : all the remaining bytes */ |
| | | encodedAttributes = new byte[in.length-pos]; |
| | |
| | | * @return the byte array representation of this Message. |
| | | */ |
| | | @Override |
| | | public byte[] getByte() |
| | | public byte[] getBytes() |
| | | { |
| | | byte[] byteDn; |
| | | try |
| | |
| | | int pos = 1; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = OP_TYPE_ADD_REQUEST; |
| | | resultByteArray[0] = MSG_TYPE_ADD_REQUEST; |
| | | /* put the DN and a terminating 0 */ |
| | | for (int i = 0; i< byteDn.length; i++,pos++) |
| | | { |
| | |
| | | import java.net.Socket; |
| | | |
| | | import org.opends.server.changelog.ProtocolSession; |
| | | import org.opends.server.changelog.SerializingProtocolSession; |
| | | import org.opends.server.changelog.SocketSession; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchListener; |
| | |
| | | private Object lock = new Object(); |
| | | private String changelogServer = "Not connected"; |
| | | private TreeSet<FakeOperation> replayOperations; |
| | | private SerializingProtocolSession session = null; |
| | | private ProtocolSession session = null; |
| | | |
| | | /** |
| | | * Creates a new Changelog Broker for a particular SynchronizationDomain. |
| | |
| | | InetAddress.getByName(hostname), Integer.parseInt(port)); |
| | | Socket socket = new Socket(); |
| | | socket.connect(ServerAddr, 500); |
| | | session = new SerializingProtocolSession(socket); |
| | | session = new SocketSession(socket); |
| | | |
| | | /* |
| | | * Send our ServerStartMessage. |
| | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import java.io.Serializable; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.core.DirectoryException; |
| | |
| | | { |
| | | private static final long serialVersionUID = -5871385537169856856L; |
| | | |
| | | private String baseDn = null; |
| | | private short serverId; |
| | | private String serverURL; |
| | | private String baseDn = null; |
| | | private ServerState serverState; |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Creates a new ChangelogStartMessage by decoding the provided byte array. |
| | | * @param in A byte array containing the encoded information for the |
| | | * ChangelogStartMessage |
| | | * @throws DataFormatException If the in does not contain a properly |
| | | * encoded ChangelogStartMessage. |
| | | */ |
| | | public ChangelogStartMessage(byte[] in) throws DataFormatException |
| | | { |
| | | /* The ChangelogStartMessage is encoded in the form : |
| | | * <baseDn><ServerId><ServerUrl><ServerState> |
| | | */ |
| | | try |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != MSG_TYPE_CHANGELOG_START) |
| | | throw new DataFormatException("input is not a valid ChangelogStartMsg"); |
| | | int pos = 1; |
| | | |
| | | /* read the dn |
| | | * first calculate the length then construct the string |
| | | */ |
| | | int length = getNextLength(in, pos); |
| | | baseDn = new String(in, pos, length, "UTF-8"); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ServerId |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | String serverIdString = new String(in, pos, length, "UTF-8"); |
| | | serverId = Short.valueOf(serverIdString); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ServerURL |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | serverURL = new String(in, pos, length, "UTF-8"); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ServerState |
| | | */ |
| | | serverState = new ServerState(in, pos, in.length-1); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the Server Id. |
| | | * @return the server id |
| | | */ |
| | |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public byte[] getBytes() |
| | | { |
| | | /* The ChangelogStartMessage is stored in the form : |
| | | * <operation type><basedn><serverid><serverURL><serverState> |
| | | */ |
| | | try { |
| | | byte[] byteDn = baseDn.getBytes("UTF-8"); |
| | | byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8"); |
| | | byte[] byteServerUrl = serverURL.getBytes("UTF-8"); |
| | | byte[] byteServerState = serverState.getBytes(); |
| | | |
| | | int length = 1 + byteDn.length + 1 + byteServerId.length + 1 + |
| | | byteServerUrl.length + 1 + byteServerState.length + 1; |
| | | |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = MSG_TYPE_CHANGELOG_START; |
| | | int pos = 1; |
| | | |
| | | /* put the baseDN and a terminating 0 */ |
| | | pos = addByteArray(byteDn, resultByteArray, pos); |
| | | |
| | | /* put the ServerId */ |
| | | pos = addByteArray(byteServerId, resultByteArray, pos); |
| | | |
| | | /* put the ServerURL */ |
| | | pos = addByteArray(byteServerUrl, resultByteArray, pos); |
| | | |
| | | /* put the ServerState */ |
| | | pos = addByteArray(byteServerState, resultByteArray, pos); |
| | | |
| | | return resultByteArray; |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.synchronization; |
| | | |
| | | import static org.opends.server.protocols.ldap.LDAPConstants.*; |
| | | import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION; |
| | | |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.core.DeleteOperation; |
| | | import org.opends.server.core.Operation; |
| | |
| | | * Creates a new Add message from a byte[]. |
| | | * |
| | | * @param in The byte[] from which the operation must be read. |
| | | * @throws Exception The input byte[] is not a valid AddMsg |
| | | * @throws DataFormatException The input byte[] is not a valid AddMsg |
| | | */ |
| | | public DeleteMsg(byte[] in) throws Exception |
| | | public DeleteMsg(byte[] in) throws DataFormatException |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != OP_TYPE_DELETE_REQUEST) |
| | | throw new Exception("byte[] is not a valid delete msg"); |
| | | if (in[0] != MSG_TYPE_DELETE_REQUEST) |
| | | throw new DataFormatException("byte[] is not a valid delete msg"); |
| | | int pos = 1; |
| | | |
| | | /* read the dn |
| | |
| | | while (in[pos++] != 0) |
| | | { |
| | | if (pos > in.length) |
| | | throw new Exception("byte[] is not a valid delete msg"); |
| | | throw new DataFormatException("byte[] is not a valid delete msg"); |
| | | length++; |
| | | } |
| | | dn = new String(in, offset, length, "UTF-8"); |
| | | try |
| | | { |
| | | dn = new String(in, offset, length, "UTF-8"); |
| | | |
| | | /* read the changeNumber |
| | | * it is always 24 characters long |
| | | */ |
| | | String changenumberStr = new String(in, pos, 24, "UTF-8"); |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | /* read the changeNumber |
| | | * it is always 24 characters long |
| | | */ |
| | | String changenumberStr = new String(in, pos, 24, "UTF-8"); |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | } |
| | | |
| | | |
| | |
| | | * @return The byte array representation of this Message. |
| | | */ |
| | | @Override |
| | | public byte[] getByte() |
| | | public byte[] getBytes() |
| | | { |
| | | byte[] byteDn; |
| | | try |
| | |
| | | int pos = 1; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = OP_TYPE_DELETE_REQUEST; |
| | | resultByteArray[0] = MSG_TYPE_DELETE_REQUEST; |
| | | |
| | | /* put the DN and a terminating 0 */ |
| | | for (int i = 0; i< byteDn.length; i++,pos++) |
| | |
| | | * The last update seen on this entry, allows fast conflict detection. |
| | | */ |
| | | private ChangeNumber moreRecentChangenumber = |
| | | new ChangeNumber((long)0,0,(short)0); |
| | | new ChangeNumber(0,0,(short)0); |
| | | |
| | | /* |
| | | * contains Historical information for each attribute sorted by attribute type |
| | |
| | | /** |
| | | * Process an operation. |
| | | * This method is responsible for detecting and resolving conflict for |
| | | * modifyOperation. This is done by comparing the historical information |
| | | * stored |
| | | * modifyOperation. This is done by using the historical information. |
| | | * |
| | | * @param modifyOperation the operation to be processed |
| | | * @param modifiedEntry the entry that is being modified (before modification) |
| | | */ |
| | | public void replayOperation(ModifyOperation modifyOperation, |
| | | Entry modifiedEntry) |
| | | Entry modifiedEntry) |
| | | { |
| | | /* TODO : this process should be split in 2 parts called |
| | | * in the resolveConflict and preOperation phases |
| | | */ |
| | | List<Modification> mods = modifyOperation.getModifications(); |
| | | ChangeNumber changeNumber = |
| | | (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION); |
| | |
| | | } |
| | | |
| | | // TODO : now purge old historical information |
| | | |
| | | if (moreRecentChangenumber == null || |
| | | moreRecentChangenumber.older(changeNumber)) |
| | | moreRecentChangenumber = changeNumber; |
| | |
| | | * This method calculate the historical information and update the hist |
| | | * attribute to store the historical information for modify operation that |
| | | * does not conflict with previous operation. |
| | | * This is the usual path and care should therefore be optimized. |
| | | * This is the usual path and should therefore be optimized. |
| | | * |
| | | * It does not check if the operation to process is conflicting or not with |
| | | * previous operations. The caller is responsible for this. |
| | |
| | | * @param changeNumber The changeNumber of the operation to process |
| | | * @param mod The modify operation to process. |
| | | */ |
| | | public void processLocalOrNonConflictModification(ChangeNumber changeNumber, |
| | | private void processLocalOrNonConflictModification(ChangeNumber changeNumber, |
| | | Modification mod) |
| | | { |
| | | /* |
| | |
| | | * @param modAttr the attribute modification |
| | | * @return false if there is nothing to do |
| | | */ |
| | | public boolean conflictDelete(ChangeNumber changeNumber, |
| | | private boolean conflictDelete(ChangeNumber changeNumber, |
| | | AttributeType type, Modification m, |
| | | Entry modifiedEntry, |
| | | AttrInfo attrInfo, Attribute modAttr ) |
| | |
| | | * @param options the options that are added |
| | | * @return false if operation becomes empty and must not be processed |
| | | */ |
| | | public boolean conflictAdd(Iterator modsIterator, ChangeNumber changeNumber, |
| | | private boolean conflictAdd(Iterator modsIterator, ChangeNumber changeNumber, |
| | | AttrInfo attrInfo, |
| | | LinkedHashSet<AttributeValue> addValues, |
| | | Set<String> options) |
| | |
| | | */ |
| | | package org.opends.server.synchronization; |
| | | |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.core.Operation; |
| | | import org.opends.server.protocols.asn1.ASN1Exception; |
| | |
| | | int msgID = MSGID_ERROR_REPLAYING_OPERATION; |
| | | String message = getMessage(msgID, |
| | | op.getResultCode().getResultCodeName(), |
| | | changeNumber.toString(), |
| | | op.toString(), op.getErrorMessage()); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | listener.updateError(changeNumber); |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | listener.updateError(changeNumber); |
| | | } |
| | | } |
| | | catch (ASN1Exception e) |
| | | { |
| | | int msgID = MSGID_EXCEPTION_DECODING_OPERATION; |
| | | String message = getMessage(msgID) + msg + |
| | | String message = getMessage(msgID, msg) + |
| | | stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | |
| | | catch (LDAPException e) |
| | | { |
| | | int msgID = MSGID_EXCEPTION_DECODING_OPERATION; |
| | | String message = getMessage(msgID) + msg + |
| | | String message = getMessage(msgID, msg) + |
| | | stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | | message, msgID); |
| | | } |
| | | catch (DataFormatException e) |
| | | { |
| | | int msgID = MSGID_EXCEPTION_DECODING_OPERATION; |
| | | String message = getMessage(msgID, msg) + |
| | | stackTraceToSingleLineString(e); |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.SEVERE_ERROR, |
| | |
| | | */ |
| | | package org.opends.server.synchronization; |
| | | |
| | | import static org.opends.server.protocols.ldap.LDAPConstants.*; |
| | | import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION; |
| | | |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.core.ModifyDNOperation; |
| | | import org.opends.server.core.Operation; |
| | |
| | | { |
| | | dn = op.getRawEntryDN().stringValue(); |
| | | deleteOldRdn = op.deleteOldRDN(); |
| | | newSuperior = op.getRawNewSuperior().stringValue(); |
| | | if (op.getRawNewSuperior() != null) |
| | | newSuperior = op.getRawNewSuperior().stringValue(); |
| | | else |
| | | newSuperior = null; |
| | | newRDN = op.getRawNewRDN().stringValue(); |
| | | changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION); |
| | | } |
| | |
| | | * Creates a new Add message from a byte[]. |
| | | * |
| | | * @param in The byte[] from which the operation must be read. |
| | | * @throws Exception The input byte[] is not a valid AddMsg |
| | | * @throws DataFormatException The input byte[] is not a valid AddMsg |
| | | */ |
| | | public ModifyDNMsg(byte[] in) throws Exception |
| | | public ModifyDNMsg(byte[] in) throws DataFormatException |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != OP_TYPE_MODIFY_DN_REQUEST) |
| | | throw new Exception("byte[] is not a valid add msg"); |
| | | if (in[0] != MSG_TYPE_MODIFYDN_REQUEST) |
| | | throw new DataFormatException("byte[] is not a valid add msg"); |
| | | int pos = 1; |
| | | |
| | | /* read the dn |
| | |
| | | while (in[pos++] != 0) |
| | | { |
| | | if (pos > in.length) |
| | | throw new Exception("byte[] is not a valid add msg"); |
| | | throw new DataFormatException("byte[] is not a valid add msg"); |
| | | length++; |
| | | } |
| | | dn = new String(in, offset, length, "UTF-8"); |
| | | |
| | | /* read the changeNumber |
| | | * it is always 24 characters long |
| | | */ |
| | | String changenumberStr = new String(in, pos, 24, "UTF-8"); |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | pos +=24; |
| | | |
| | | /* read the newRDN |
| | | * first calculate the length then construct the string |
| | | */ |
| | | length = 0; |
| | | offset = pos; |
| | | while (in[pos++] != 0) |
| | | try |
| | | { |
| | | if (pos > in.length) |
| | | throw new Exception("byte[] is not a valid add msg"); |
| | | length++; |
| | | } |
| | | newRDN = new String(in, offset, length, "UTF-8"); |
| | | dn = new String(in, offset, length, "UTF-8"); |
| | | |
| | | /* read the newSuperior |
| | | * first calculate the length then construct the string |
| | | */ |
| | | length = 0; |
| | | offset = pos; |
| | | while (in[pos++] != 0) |
| | | /* read the changeNumber |
| | | * it is always 24 characters long |
| | | */ |
| | | String changenumberStr = new String(in, pos, 24, "UTF-8"); |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | pos +=24; |
| | | |
| | | /* read the newRDN |
| | | * first calculate the length then construct the string |
| | | */ |
| | | length = 0; |
| | | offset = pos; |
| | | while (in[pos++] != 0) |
| | | { |
| | | if (pos > in.length) |
| | | throw new DataFormatException("byte[] is not a valid add msg"); |
| | | length++; |
| | | } |
| | | newRDN = new String(in, offset, length, "UTF-8"); |
| | | |
| | | /* read the newSuperior |
| | | * first calculate the length then construct the string |
| | | */ |
| | | length = 0; |
| | | offset = pos; |
| | | while (in[pos++] != 0) |
| | | { |
| | | if (pos > in.length) |
| | | throw new DataFormatException("byte[] is not a valid add msg"); |
| | | length++; |
| | | } |
| | | if (length != 0) |
| | | newSuperior = new String(in, offset, length, "UTF-8"); |
| | | else |
| | | newSuperior = null; |
| | | |
| | | /* get the deleteoldrdn flag */ |
| | | if (in[pos] == 0) |
| | | deleteOldRdn = false; |
| | | else |
| | | deleteOldRdn = true; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | if (pos > in.length) |
| | | throw new Exception("byte[] is not a valid add msg"); |
| | | length++; |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | newSuperior = new String(in, offset, length, "UTF-8"); |
| | | |
| | | /* get the deleteoldrdn flag */ |
| | | if (in[pos] == 0) |
| | | deleteOldRdn = false; |
| | | else |
| | | deleteOldRdn = true; |
| | | } |
| | | |
| | | /** |
| | |
| | | public Operation createOperation(InternalClientConnection connection) |
| | | { |
| | | ModifyDNOperation moddn = new ModifyDNOperation(connection, |
| | | InternalClientConnection.nextOperationID(), |
| | | InternalClientConnection.nextMessageID(), null, |
| | | new ASN1OctetString(dn), |
| | | new ASN1OctetString(newRDN), |
| | | deleteOldRdn, |
| | | new ASN1OctetString(newSuperior)); |
| | | InternalClientConnection.nextOperationID(), |
| | | InternalClientConnection.nextMessageID(), null, |
| | | new ASN1OctetString(dn), new ASN1OctetString(newRDN), |
| | | deleteOldRdn, |
| | | (newSuperior == null ? null : new ASN1OctetString(newSuperior))); |
| | | moddn.setAttachment(SYNCHRONIZATION, getChangeNumber()); |
| | | return moddn; |
| | | } |
| | |
| | | * @return The byte array representation of this Message. |
| | | */ |
| | | @Override |
| | | public byte[] getByte() |
| | | public byte[] getBytes() |
| | | { |
| | | try |
| | | { |
| | |
| | | int pos = 1; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = OP_TYPE_MODIFY_DN_REQUEST; |
| | | resultByteArray[0] = MSG_TYPE_MODIFYDN_REQUEST; |
| | | |
| | | /* put the DN and a terminating 0 */ |
| | | for (int i = 0; i< byteDn.length; i++,pos++) |
| | |
| | | /* put the new RDN and a terminating 0 */ |
| | | for (int i = 0; i< byteNewRdn.length; i++,pos++) |
| | | { |
| | | resultByteArray[pos] = byteDn[i]; |
| | | resultByteArray[pos] = byteNewRdn[i]; |
| | | } |
| | | resultByteArray[pos++] = 0; |
| | | |
| | |
| | | { |
| | | for (int i = 0; i< byteNewSuperior.length; i++,pos++) |
| | | { |
| | | resultByteArray[pos] = byteDn[i]; |
| | | resultByteArray[pos] = byteNewSuperior[i]; |
| | | } |
| | | resultByteArray[pos++] = 0; |
| | | } |
| | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION; |
| | | import static org.opends.server.protocols.ldap.LDAPConstants.*; |
| | | |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.core.Operation; |
| | |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | /** |
| | | * Message used to send Modify information. |
| | |
| | | public class ModifyMsg extends UpdateMessage |
| | | { |
| | | private static final long serialVersionUID = -4905520652801395185L; |
| | | private String dn; |
| | | private byte[] encodedMods; |
| | | private String dn = null; |
| | | private byte[] encodedMods = null; |
| | | private byte[] encodedMsg = null; |
| | | |
| | | /** |
| | | * Creates a new Modify message from a ModifyOperation. |
| | |
| | | * Creates a new Modify message from a byte[]. |
| | | * |
| | | * @param in The byte[] from which the operation must be read. |
| | | * @throws Exception The input byte[] is not a valid modifyMsg |
| | | * @throws DataFormatException If the input byte[] is not a valid modifyMsg |
| | | * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM. |
| | | */ |
| | | public ModifyMsg(byte[] in) throws Exception |
| | | public ModifyMsg(byte[] in) throws DataFormatException, |
| | | UnsupportedEncodingException |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != OP_TYPE_MODIFY_REQUEST) |
| | | throw new Exception("byte[] is not a valid modify msg"); |
| | | encodedMsg = in; |
| | | decodeChangeNumber(in); |
| | | } |
| | | |
| | | private void decodeChangeNumber(byte[] in) throws DataFormatException, |
| | | UnsupportedEncodingException |
| | | { |
| | | /* read the changeNumber */ |
| | | int pos = 1; |
| | | int length = getNextLength(encodedMsg, pos); |
| | | String changenumberStr = new String(encodedMsg, pos, length, "UTF-8"); |
| | | pos += length +1; |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | } |
| | | |
| | | /** |
| | | * Get the byte array representation of this Message. |
| | | * |
| | | * @return The byte array representation of this Message. |
| | | */ |
| | | @Override |
| | | public byte[] getBytes() |
| | | { |
| | | if (encodedMsg == null) |
| | | { |
| | | try |
| | | { |
| | | encode(); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens : TODO : log some error |
| | | return null; |
| | | } |
| | | } |
| | | return encodedMsg; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public Operation createOperation(InternalClientConnection connection) |
| | | throws LDAPException, ASN1Exception, DataFormatException |
| | | { |
| | | if (encodedMods == null) |
| | | { |
| | | decode(); |
| | | } |
| | | |
| | | ArrayList<LDAPModification> ldapmods; |
| | | |
| | | ArrayList<ASN1Element> mods = null; |
| | | |
| | | mods = ASN1Element.decodeElements(encodedMods); |
| | | |
| | | ldapmods = new ArrayList<LDAPModification>(mods.size()); |
| | | for (ASN1Element elem : mods) |
| | | ldapmods.add(LDAPModification.decode(elem)); |
| | | |
| | | ModifyOperation mod = new ModifyOperation(connection, |
| | | InternalClientConnection.nextOperationID(), |
| | | InternalClientConnection.nextMessageID(), null, |
| | | new ASN1OctetString(dn), ldapmods); |
| | | mod.setAttachment(SYNCHRONIZATION, getChangeNumber()); |
| | | return mod; |
| | | } |
| | | |
| | | /** |
| | | * Encode the Msg information into a byte array. |
| | | * |
| | | * @throws UnsupportedEncodingException If utf8 is not suported. |
| | | */ |
| | | private void encode() throws UnsupportedEncodingException |
| | | { |
| | | byte[] byteDn = dn.getBytes("UTF-8"); |
| | | byte[] changeNumberByte = |
| | | this.getChangeNumber().toString().getBytes("UTF-8"); |
| | | |
| | | /* The Modify message is stored in the form : |
| | | * <operation type>changenumber><dn><<mods> |
| | | * the length of result byte array is therefore : |
| | | * 1 + dn length + 1 + 24 + mods length |
| | | */ |
| | | int length = 1 + changeNumberByte.length + 1 + byteDn.length + 1 |
| | | + encodedMods.length + 1; |
| | | encodedMsg = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | encodedMsg[0] = MSG_TYPE_MODIFY_REQUEST; |
| | | int pos = 1; |
| | | |
| | | /* read the dn |
| | | * first calculate the length then construct the string |
| | | */ |
| | | int length = 0; |
| | | int offset = pos; |
| | | while (in[pos++] != 0) |
| | | { |
| | | if (pos > in.length) |
| | | throw new Exception("byte[] is not a valid modify msg"); |
| | | length++; |
| | | } |
| | | dn = new String(in, offset, length, "UTF-8"); |
| | | /* put the ChangeNumber */ |
| | | pos = addByteArray(changeNumberByte, encodedMsg, pos); |
| | | |
| | | /* read the changeNumber |
| | | * it is always 24 characters long |
| | | */ |
| | | String changenumberStr = new String(in, pos, 24, "UTF-8"); |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | pos +=24; |
| | | /* put the DN and a terminating 0 */ |
| | | pos = addByteArray(byteDn, encodedMsg, pos); |
| | | |
| | | /* Read the mods : all the remaining bytes */ |
| | | encodedMods = new byte[in.length-pos]; |
| | | int i =0; |
| | | while (pos<in.length) |
| | | /* put the mods */ |
| | | pos = addByteArray(encodedMods, encodedMsg, pos); |
| | | } |
| | | |
| | | /** |
| | | * Decode the encodedMsg into mods and dn. |
| | | * |
| | | * @throws DataFormatException when the encodedMsg is no a valid modify. |
| | | */ |
| | | private void decode() throws DataFormatException |
| | | { |
| | | /* first byte is the type */ |
| | | if (encodedMsg[0] != MSG_TYPE_MODIFY_REQUEST) |
| | | throw new DataFormatException("byte[] is not a valid modify msg"); |
| | | |
| | | try |
| | | { |
| | | encodedMods[i++] = in[pos++]; |
| | | /* read the changeNumber */ |
| | | int pos = 1; |
| | | int length = getNextLength(encodedMsg, pos); |
| | | String changenumberStr = new String(encodedMsg, pos, length, "UTF-8"); |
| | | pos += length +1; |
| | | changeNumber = new ChangeNumber(changenumberStr); |
| | | |
| | | /* read the dn */ |
| | | length = getNextLength(encodedMsg, pos); |
| | | dn = new String(encodedMsg, pos, length, "UTF-8"); |
| | | pos += length +1; |
| | | |
| | | /* Read the mods : all the remaining bytes but the terminating 0 */ |
| | | encodedMods = new byte[encodedMsg.length-pos-1]; |
| | | int i =0; |
| | | while (pos<encodedMsg.length-1) |
| | | { |
| | | encodedMods[i++] = encodedMsg[pos++]; |
| | | } |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Create an operation from this Modify message. |
| | | * |
| | | * @param connection The connection to use when creating the operation. |
| | | * @return The created operation. |
| | | * @throws LDAPException In case of ldap decoding exception. |
| | | * @throws ASN1Exception In case of ASN1 decoding exception. |
| | | */ |
| | | @Override |
| | | public Operation createOperation(InternalClientConnection connection) |
| | | throws LDAPException, ASN1Exception |
| | | { |
| | | ArrayList<LDAPModification> ldapmods; |
| | | |
| | | ArrayList<ASN1Element> mods = null; |
| | | |
| | | mods = ASN1Element.decodeElements(encodedMods); |
| | | |
| | | ldapmods = new ArrayList<LDAPModification>(mods.size()); |
| | | for (ASN1Element elem : mods) |
| | | ldapmods.add(LDAPModification.decode(elem)); |
| | | |
| | | ModifyOperation mod = new ModifyOperation(connection, |
| | | InternalClientConnection.nextOperationID(), |
| | | InternalClientConnection.nextMessageID(), null, |
| | | new ASN1OctetString(dn), ldapmods); |
| | | mod.setAttachment(SYNCHRONIZATION, getChangeNumber()); |
| | | return mod; |
| | | } |
| | | |
| | | /** |
| | | * Get the byte array representation of this Message. |
| | | * |
| | | * @return The byte array representation of this Message. |
| | | */ |
| | | @Override |
| | | public byte[] getByte() |
| | | { |
| | | byte[] byteDn; |
| | | try |
| | | { |
| | | byteDn = dn.getBytes("UTF-8"); |
| | | |
| | | /* The Modify message is stored in the form : |
| | | * <operation type><dn><changenumber><mods> |
| | | * the length of result byte array is therefore : |
| | | * 1 + dn length + 1 + 24 + mods length |
| | | */ |
| | | int length = 1 + byteDn.length + 1 + 24 + encodedMods.length; |
| | | byte[] resultByteArray = new byte[length]; |
| | | int pos = 1; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = OP_TYPE_MODIFY_REQUEST; |
| | | /* put the DN and a terminating 0 */ |
| | | for (int i = 0; i< byteDn.length; i++,pos++) |
| | | { |
| | | resultByteArray[pos] = byteDn[i]; |
| | | } |
| | | resultByteArray[pos++] = 0; |
| | | /* put the ChangeNumber */ |
| | | byte[] changeNumberByte = |
| | | this.getChangeNumber().toString().getBytes("UTF-8"); |
| | | for (int i=0; i<24; i++,pos++) |
| | | { |
| | | resultByteArray[pos] = changeNumberByte[i]; |
| | | } |
| | | |
| | | /* put the mods */ |
| | | for (int i=0; i<encodedMods.length; i++,pos++) |
| | | { |
| | | resultByteArray[pos] = encodedMods[i]; |
| | | } |
| | | return resultByteArray; |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | // should never happens : TODO : log some error |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | |
| | | private static Map<DN, SynchronizationDomain> domains = |
| | | new HashMap<DN, SynchronizationDomain>() ; |
| | | |
| | | |
| | | /** |
| | | * InitializePlugin function. |
| | | * called at server initialization time |
| | | * @param configEntry the Entry that contains configuration |
| | | * @throws ConfigException if config is invalid |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void initializeSynchronizationProvider(ConfigEntry configEntry) |
| | | throws ConfigException |
| | |
| | | } |
| | | |
| | | /** |
| | | * Post-Operation method, called by server when processing is complete. |
| | | * @param modifyDNOperation Operation for which the post-operation is called |
| | | * {@inheritDoc} |
| | | */ |
| | | public void doPostOperation(ModifyDNOperation modifyDNOperation) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Post-Operation method, called by server when processing is complete. |
| | | * @param modifyOperation Operation for which the post-operation is called |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void doPostOperation(ModifyOperation modifyOperation) |
| | | { |
| | | DN dn = modifyOperation.getEntryDN(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Handle the conflict resolution. |
| | | * Called by the core server after locking the entry and before |
| | | * starting the actual modification. |
| | | * @param modifyOperation the operation |
| | | * @return code indicating is operation must proceed |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public SynchronizationProviderResult handleConflictResolution( |
| | | ModifyOperation modifyOperation) |
| | | { |
| | |
| | | return domain.handleConflictResolution(modifyOperation); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Pre-operation processing. |
| | | * Called after operation has been processed by the core server |
| | | * but before being committed to the backend |
| | | * Generate the Change number and update the historical information |
| | | * @param modifyOperation the current operation |
| | | * @return code indicating if operation must be processed |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public SynchronizationProviderResult |
| | |
| | | } |
| | | |
| | | /** |
| | | * Pre-operation processing. |
| | | * Called after operation has been processed by the core server |
| | | * but before being committed to the backend |
| | | * Generate the Change number and update the historical information |
| | | * @param addOperation the current operation |
| | | * @return code indicating if operation must be processed |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public SynchronizationProviderResult doPreOperation(AddOperation addOperation) |
| | |
| | | * Called after operation has been processed by the core server |
| | | * but before being committed to the backend |
| | | * Generate the Change number and update the historical information |
| | | * |
| | | * @param deleteOperation the current operation |
| | | * @return code indicating if operation must be processed |
| | | */ |
| | |
| | | * Called after operation has been processed by the core server |
| | | * but before being committed to the backend |
| | | * Generate the Change number and update the historical information |
| | | * |
| | | * @param modifyDNOperation the current operation |
| | | * @return code indicating if operation must be processed |
| | | */ |
| | |
| | | return; |
| | | |
| | | domain.synchronize(operation); |
| | | |
| | | return; |
| | | } |
| | | |
| | | /** |
| | | * Get the ServerState associated to the SynchronizationDomain |
| | | * with a given DN. |
| | | * |
| | | * @param baseDn The DN of the Synchronization Domain for which the |
| | | * ServerState must be returned. |
| | | * @return the ServerState associated to the SynchronizationDomain |
| | |
| | | SynchronizationDomain domain = findDomain(baseDn); |
| | | return domain.getServerState(); |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import java.io.Serializable; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.net.InetAddress; |
| | | import java.net.UnknownHostException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.core.DirectoryException; |
| | |
| | | { |
| | | private static final long serialVersionUID = 8649393307038290287L; |
| | | |
| | | private short ServerId; // Id of the LDAP server that sent this message |
| | | private short serverId; // Id of the LDAP server that sent this message |
| | | private String serverURL; |
| | | private String baseDn; |
| | | private ServerState serverState = null; |
| | | private int maxReceiveQueue; |
| | | private int maxSendQueue; |
| | | private int maxReceiveDelay; |
| | | private int maxSendDelay; |
| | | |
| | | |
| | | // TODO : should have a RUV here |
| | | |
| | | private String serverURL; |
| | | private ServerState serverState = null; |
| | | |
| | | /** |
| | | * Create a new ServerStartMessage. |
| | |
| | | int maxReceiveQueue, int maxSendDelay, |
| | | int maxSendQueue, ServerState serverState) |
| | | { |
| | | this.ServerId = serverId; |
| | | this.serverId = serverId; |
| | | this.baseDn = baseDn.toString(); |
| | | this.maxReceiveDelay = maxReceiveDelay; |
| | | this.maxReceiveQueue = maxReceiveQueue; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Creates a new ServerStartMessage from its encoded form. |
| | | * |
| | | * @param in The byte array containing the encoded form of the |
| | | * ServerStartMessage. |
| | | * @throws DataFormatException If the byte array does not contain a valid |
| | | * encoded form of the ServerStartMessage. |
| | | */ |
| | | public ServerStartMessage(byte[] in) throws DataFormatException |
| | | { |
| | | /* The ServerStartMessage is encoded in the form : |
| | | * <operation type><baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <maxSendDelay><maxSendQueue><ServerState> |
| | | */ |
| | | try |
| | | { |
| | | /* first byte is the type */ |
| | | if (in[0] != MSG_TYPE_SERVER_START) |
| | | throw new DataFormatException("input is not a valid ServerStart msg"); |
| | | int pos = 1; |
| | | |
| | | /* |
| | | * read the dn |
| | | * first calculate the length then construct the string |
| | | */ |
| | | int length = getNextLength(in, pos); |
| | | baseDn = new String(in, pos, length, "UTF-8"); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ServerId |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | String serverIdString = new String(in, pos, length, "UTF-8"); |
| | | serverId = Short.valueOf(serverIdString); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ServerURL |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | serverURL = new String(in, pos, length, "UTF-8"); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the maxReceiveDelay |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | maxReceiveDelay = Integer.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the maxReceiveQueue |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | maxReceiveQueue = Integer.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the maxSendDelay |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | maxSendDelay = Integer.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the maxSendQueue |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | maxSendQueue = Integer.valueOf(new String(in, pos, length, "UTF-8")); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ServerState |
| | | */ |
| | | serverState = new ServerState(in, pos, in.length-1); |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the ServerID from the message. |
| | | * @return the server ID |
| | | */ |
| | | public short getServerId() |
| | | { |
| | | return ServerId; |
| | | return serverId; |
| | | } |
| | | |
| | | /** |
| | |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public byte[] getBytes() |
| | | { |
| | | /* |
| | | * ServerStartMessage contains. |
| | | * <baseDn><ServerId><ServerUrl><maxRecvDelay><maxRecvQueue> |
| | | * <maxSendDelay><maxSendQueue><ServerState> |
| | | */ |
| | | try { |
| | | byte[] byteDn = baseDn.getBytes("UTF-8"); |
| | | byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8"); |
| | | byte[] byteServerUrl = serverURL.getBytes("UTF-8"); |
| | | byte[] byteMaxRecvDelay = |
| | | String.valueOf(maxReceiveDelay).getBytes("UTF-8"); |
| | | byte[] byteMaxRecvQueue = |
| | | String.valueOf(maxReceiveQueue).getBytes("UTF-8"); |
| | | byte[] byteMaxSendDelay = |
| | | String.valueOf(maxSendDelay).getBytes("UTF-8"); |
| | | byte[] byteMaxSendQueue = |
| | | String.valueOf(maxSendQueue).getBytes("UTF-8"); |
| | | byte[] byteServerState = serverState.getBytes(); |
| | | |
| | | int length = 1 + byteDn.length + 1 + byteServerId.length + 1 + |
| | | byteServerUrl.length + 1 + |
| | | byteMaxRecvDelay.length + 1 + |
| | | byteMaxRecvQueue.length + 1 + |
| | | byteMaxSendDelay.length + 1 + |
| | | byteMaxSendQueue.length + 1 + |
| | | byteServerState.length + 1; |
| | | |
| | | byte[] resultByteArray = new byte[length]; |
| | | |
| | | /* put the type of the operation */ |
| | | resultByteArray[0] = MSG_TYPE_SERVER_START; |
| | | int pos = 1; |
| | | |
| | | pos = addByteArray(byteDn, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteServerId, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteServerUrl, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteMaxRecvDelay, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteMaxRecvQueue, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteMaxSendDelay, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteMaxSendQueue, resultByteArray, pos); |
| | | |
| | | pos = addByteArray(byteServerState, resultByteArray, pos); |
| | | |
| | | return resultByteArray; |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | | } |
| | |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import java.io.Serializable; |
| | | import java.io.UnsupportedEncodingException; |
| | | |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.types.Control; |
| | |
| | | * from each server. |
| | | * It is exchanged with the changelog servers at connection establishment time |
| | | * It is locally saved in the database |
| | | * TODO : should extract from this object the code that read/save |
| | | * from/to the database on the LDAP server side and put it in a new class |
| | | * that is only used on the LDAP server side and that encapsulate this class. |
| | | */ |
| | | public class ServerState implements Serializable |
| | | { |
| | | private static final long serialVersionUID = 314772980474416183L; |
| | | private static final String SYNCHRONIZATION_STATE = "ds-sync-state"; |
| | | |
| | | private HashMap<Short, ChangeNumber> list; |
| | | transient private static final String |
| | | SYNCHRONIZATION_STATE = "ds-sync-state"; |
| | | transient private DN baseDn; |
| | | transient boolean savedStatus = true; |
| | | transient private InternalClientConnection conn = |
| | | new InternalClientConnection(); |
| | | new InternalClientConnection(); |
| | | transient private ASN1OctetString serverStateAsn1Dn; |
| | | transient private DN serverStateDn; |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Creates a new ServerState object from its encoded form. |
| | | * |
| | | * @param in The byte array containing the encoded ServerState form. |
| | | * @param pos The position in the byte array where the encoded ServerState |
| | | * starts. |
| | | * @param endpos The position in the byte array where the encoded ServerState |
| | | * ends. |
| | | * @throws DataFormatException If the encoded form was not correct. |
| | | */ |
| | | public ServerState(byte[] in, int pos, int endpos) |
| | | throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | list = new HashMap<Short, ChangeNumber>(); |
| | | |
| | | while (endpos > pos) |
| | | { |
| | | /* |
| | | * read the ServerId |
| | | */ |
| | | int length = getNextLength(in, pos); |
| | | String serverIdString = new String(in, pos, length, "UTF-8"); |
| | | short serverId = Short.valueOf(serverIdString); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * read the ChangeNumber |
| | | */ |
| | | length = getNextLength(in, pos); |
| | | String cnString = new String(in, pos, length, "UTF-8"); |
| | | ChangeNumber cn = new ChangeNumber(cnString); |
| | | pos += length +1; |
| | | |
| | | /* |
| | | * Add the serverid |
| | | */ |
| | | list.put(serverId, cn); |
| | | } |
| | | |
| | | } catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new DataFormatException("UTF-8 is not supported by this jvm."); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the length of the next String encoded in the in byte array. |
| | | * |
| | | * @param in the byte array where to calculate the string. |
| | | * @param pos the position whre to start from in the byte array. |
| | | * @return the length of the next string. |
| | | * @throws DataFormatException If the byte array does not end with null. |
| | | */ |
| | | private int getNextLength(byte[] in, int pos) throws DataFormatException |
| | | { |
| | | int offset = pos; |
| | | int length = 0; |
| | | while (in[offset++] != 0) |
| | | { |
| | | if (offset >= in.length) |
| | | throw new DataFormatException("byte[] is not a valid modify msg"); |
| | | length++; |
| | | } |
| | | return length; |
| | | } |
| | | |
| | | /** |
| | | * Update the Server State with a ChangeNumber. |
| | | * All operations with smaller CSN and the same serverID must be committed |
| | | * before calling this method. |
| | |
| | | */ |
| | | public void save() |
| | | { |
| | | if (list.size() == 0) |
| | | if ((list.size() == 0) || savedStatus) |
| | | return; |
| | | |
| | | ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>(); |
| | |
| | | ASN1OctetString value = new ASN1OctetString(list.get(id).toString()); |
| | | values.add(value); |
| | | } |
| | | savedStatus = true; |
| | | } |
| | | LDAPAttribute attr = new LDAPAttribute(SYNCHRONIZATION_STATE, values); |
| | | LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr); |
| | |
| | | } |
| | | else |
| | | { |
| | | savedStatus = false; |
| | | int msgID = MSGID_ERROR_UPDATING_RUV; |
| | | String message = getMessage(msgID, |
| | | op.getResultCode().getResultCodeName(), |
| | |
| | | return serverStateDn; |
| | | } |
| | | |
| | | /** |
| | | * Add the tail into resultByteArray at position pos. |
| | | */ |
| | | private int addByteArray(byte[] tail, byte[] resultByteArray, int pos) |
| | | { |
| | | for (int i=0; i<tail.length; i++,pos++) |
| | | { |
| | | resultByteArray[pos] = tail[i]; |
| | | } |
| | | resultByteArray[pos++] = 0; |
| | | return pos; |
| | | } |
| | | |
| | | /** |
| | | * Encode this ServerState object and return its byte array representation. |
| | | * |
| | | * @return a byte array with an encoded representation of this object. |
| | | * @throws UnsupportedEncodingException if UTF8 is not supported by the JVM. |
| | | */ |
| | | public byte[] getBytes() throws UnsupportedEncodingException |
| | | { |
| | | synchronized (this) |
| | | { |
| | | int length = 0; |
| | | List<String> idList = new ArrayList<String>(list.size()); |
| | | for (short id : list.keySet()) |
| | | { |
| | | String temp = String.valueOf(id); |
| | | idList.add(temp); |
| | | length += temp.length() + 1; |
| | | } |
| | | List<String> cnList = new ArrayList<String>(list.size()); |
| | | for (ChangeNumber cn : list.values()) |
| | | { |
| | | String temp = cn.toString(); |
| | | cnList.add(temp); |
| | | length += temp.length() + 1; |
| | | } |
| | | byte[] result = new byte[length]; |
| | | |
| | | int pos = 0; |
| | | for (int i=0; i< list.size(); i++) |
| | | { |
| | | String str = idList.get(i); |
| | | pos = addByteArray(str.getBytes("UTF-8"), result, pos); |
| | | str = cnList.get(i); |
| | | pos = addByteArray(str.getBytes("UTF-8"), result, pos); |
| | | } |
| | | return result; |
| | | } |
| | | } |
| | | } |
| | |
| | | MessageHandler.registerMessage(MSGID_UNKNOWN_TYPE, |
| | | "Unknown operation type : %s"); |
| | | MessageHandler.registerMessage(MSGID_ERROR_REPLAYING_OPERATION, |
| | | "Error %s when replaying operation %s : %s"); |
| | | "Error %s when replaying operation with changenumber %s %s : %s"); |
| | | MessageHandler.registerMessage(MSGID_OPERATION_NOT_FOUND_IN_PENDING, |
| | | "Internal Error : Operation %s change number %s" + |
| | | " was not found in pending list"); |
| | |
| | | " The Changelog service is going to shutdown. "); |
| | | MessageHandler.registerMessage(MSGID_CHANGELOG_CONNECTION_ERROR, |
| | | "Error during Changelog service message processing ." + |
| | | " Connection %s is rejected. "); |
| | | " Connection from %s is rejected. "); |
| | | MessageHandler.registerMessage(MSGID_UNKNOWN_MESSAGE, |
| | | "%s has sent an unknown message. Closing the connection. "); |
| | | MessageHandler.registerMessage(MSGID_WRITER_UNEXPECTED_EXCEPTION, |
| | |
| | | maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | state); |
| | | } |
| | | |
| | | /** |
| | | * This methods is called when an error happends while replaying |
| | | * and operation. |
| | | * It is necessary because the postOPeration does not always get |
| | | * called when error or Exceptions happen during the operation replay. |
| | | * |
| | | * @param changeNumber the ChangeNumber of the operation with error. |
| | | */ |
| | | public void updateError(ChangeNumber changeNumber) |
| | | { |
| | | synchronized (pendingChanges) |
| | | { |
| | | pendingChanges.remove(changeNumber); |
| | | } |
| | | } |
| | | } |
| | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import java.io.Serializable; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | /** |
| | | * Abstract class that must be used when defining messages that can |
| | | * be sent for synchronization purpose between servers. |
| | | * |
| | | * When extending this class one should also create a new MSG_TYPE |
| | | * and should update the generateMsg() method. |
| | | */ |
| | | public abstract class SynchronizationMessage implements Serializable |
| | | { |
| | | static final byte MSG_TYPE_MODIFY_REQUEST = 1; |
| | | static final byte MSG_TYPE_ADD_REQUEST = 2; |
| | | static final byte MSG_TYPE_DELETE_REQUEST = 3; |
| | | static final byte MSG_TYPE_MODIFYDN_REQUEST = 4; |
| | | static final byte MSG_TYPE_ACK = 5; |
| | | static final byte MSG_TYPE_SERVER_START = 6; |
| | | static final byte MSG_TYPE_CHANGELOG_START = 7; |
| | | |
| | | /** |
| | | * Do the processing necessary when the message is received. |
| | | * |
| | |
| | | * @return an UpdateMessage if the processing result is an UpdateMessage. |
| | | */ |
| | | public abstract UpdateMessage processReceive(SynchronizationDomain domain); |
| | | |
| | | /** |
| | | * Return the byte[] representation of this message. |
| | | * Depending on the message type, the first byte of the byte[] must be. |
| | | * MSG_TYPE_MODIFY_REQUEST |
| | | * MSG_TYPE_ADD_REQUEST |
| | | * MSG_TYPE_DELETE_REQUEST |
| | | * MSG_TYPE_MODIFY_DN_REQUEST |
| | | * MSG_TYPE_ACK |
| | | * MSG_TYPE_SERVER_START |
| | | * MSG_TYPE_CHANGELOG_START |
| | | * |
| | | * @return the byte[] representation of this message. |
| | | */ |
| | | public abstract byte[] getBytes(); |
| | | |
| | | |
| | | /** |
| | | * Generates a SynchronizationMessage from its encoded form. |
| | | * |
| | | * @param buffer The encode form of the SynchronizationMessage. |
| | | * @return the generated SycnhronizationMessage. |
| | | * @throws DataFormatException if the encoded form was not a valid msg. |
| | | * @throws UnsupportedEncodingException if UTF8 is not supported. |
| | | */ |
| | | public static SynchronizationMessage generateMsg(byte[] buffer) |
| | | throws DataFormatException, UnsupportedEncodingException |
| | | { |
| | | SynchronizationMessage msg = null; |
| | | switch (buffer[0]) |
| | | { |
| | | case MSG_TYPE_MODIFY_REQUEST: |
| | | msg = new ModifyMsg(buffer); |
| | | break; |
| | | case MSG_TYPE_ADD_REQUEST: |
| | | msg = new AddMsg(buffer); |
| | | break; |
| | | case MSG_TYPE_DELETE_REQUEST: |
| | | msg = new DeleteMsg(buffer); |
| | | break; |
| | | case MSG_TYPE_MODIFYDN_REQUEST: |
| | | msg = new ModifyDNMsg(buffer); |
| | | break; |
| | | case MSG_TYPE_ACK: |
| | | msg = new AckMessage(buffer); |
| | | break; |
| | | case MSG_TYPE_SERVER_START: |
| | | msg = new ServerStartMessage(buffer); |
| | | break; |
| | | case MSG_TYPE_CHANGELOG_START: |
| | | msg = new ChangelogStartMessage(buffer); |
| | | break; |
| | | default: |
| | | throw new DataFormatException("received message with unknown type"); |
| | | } |
| | | return msg; |
| | | } |
| | | |
| | | /** |
| | | * Concatenate the tail byte array into the resultByteArray. |
| | | * The resultByteArray must be large enough before calling this method. |
| | | * |
| | | * @param tail the byte array to concatenate. |
| | | * @param resultByteArray The byte array to concatenate to. |
| | | * @param pos the position where to concatenate. |
| | | * @return the next position to use in the resultByteArray. |
| | | */ |
| | | protected int addByteArray(byte[] tail, byte[] resultByteArray, int pos) |
| | | { |
| | | for (int i=0; i<tail.length; i++,pos++) |
| | | { |
| | | resultByteArray[pos] = tail[i]; |
| | | } |
| | | resultByteArray[pos++] = 0; |
| | | return pos; |
| | | } |
| | | |
| | | /** |
| | | * Get the length of the next String encoded in the in byte array. |
| | | * |
| | | * @param in the byte array where to calculate the string. |
| | | * @param pos the position whre to start from in the byte array. |
| | | * @return the length of the next string. |
| | | * @throws DataFormatException If the byte array does not end with null. |
| | | */ |
| | | protected int getNextLength(byte[] in, int pos) throws DataFormatException |
| | | { |
| | | int offset = pos; |
| | | int length = 0; |
| | | while (in[offset++] != 0) |
| | | { |
| | | if (offset >= in.length) |
| | | throw new DataFormatException("byte[] is not a valid modify msg"); |
| | | length++; |
| | | } |
| | | return length; |
| | | } |
| | | } |
| | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import java.io.Serializable; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.core.Operation; |
| | | import org.opends.server.protocols.asn1.ASN1Exception; |
| | |
| | | * @return the created Operation |
| | | * @throws LDAPException In case of LDAP decoding exception. |
| | | * @throws ASN1Exception In case of ASN1 decoding exception. |
| | | * @throws DataFormatException In case of bad msg format. |
| | | */ |
| | | public abstract Operation createOperation(InternalClientConnection conn) |
| | | throws LDAPException, ASN1Exception; |
| | | throws LDAPException, ASN1Exception, DataFormatException; |
| | | |
| | | /** |
| | | * Return the byte[] representation on this message. |
| | | * Depending on the message type, the first byte of the byte[] must be. |
| | | * org.opends.server.com.protocols.ldap.LDAPConstants.OP_TYPE_MODIFY_REQUEST |
| | | * OP_TYPE_ADD_REQUEST |
| | | * OP_TYPE_DELETE_REQUEST |
| | | * OP_TYPE_MODIFY_DN_REQUEST |
| | | * |
| | | * @return the byte[] representation of this message. |
| | | */ |
| | | public abstract byte[] getByte(); |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | |
| | | domain.receiveUpdate(this); |
| | | return this; |
| | | } |
| | | |
| | | |
| | | } |