From 7adb93986ace907531875e25be1f94d735fbb068 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 08 Dec 2008 08:03:33 +0000
Subject: [PATCH] Merge the replication-service branch with the OpenDS trunk
---
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java | 596 ++++++++++++++++++-----------------------------------------
1 files changed, 185 insertions(+), 411 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
index a2ce835..ca50458 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -26,26 +26,17 @@
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
-import org.opends.server.protocols.asn1.ASN1Exception;
-import org.opends.server.protocols.internal.InternalClientConnection;
+import java.io.UnsupportedEncodingException;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.types.AbstractOperation;
-import org.opends.server.types.LDAPException;
-import org.opends.server.types.operation.PostOperationAddOperation;
-import org.opends.server.types.operation.PostOperationDeleteOperation;
-import org.opends.server.types.operation.PostOperationModifyDNOperation;
-import org.opends.server.types.operation.PostOperationModifyOperation;
-import org.opends.server.types.operation.PostOperationOperation;
/**
* Abstract class that must be extended to define a message
* used for sending Updates between servers.
*/
-public abstract class UpdateMsg extends ReplicationMsg
+public class UpdateMsg extends ReplicationMsg
implements Comparable<UpdateMsg>
{
/**
@@ -59,11 +50,6 @@
protected ChangeNumber changeNumber;
/**
- * The DN on which the update was originally done.
- */
- protected String dn = null;
-
- /**
* True when the update must use assured replication.
*/
protected boolean assuredFlag = false;
@@ -76,83 +62,62 @@
/**
* When assured mode is safe data, gives the requested level.
*/
- protected byte safeDataLevel = (byte)-1;
+ protected byte safeDataLevel = (byte)1;
/**
- * The uniqueId of the entry that was updated.
+ * The payload that must be encoded in this message.
*/
- protected String uniqueId;
+ private byte[] payload;
+
/**
- * Creates a new UpdateMsg.
+ * Creates a new empty UpdateMsg.
*/
- public UpdateMsg()
+ protected UpdateMsg()
+ {}
+
+ /**
+ * Creates a new UpdateMsg with the given informations.
+ *
+ * @param bytes A Byte Array with the encoded form of the message.
+ *
+ * @throws DataFormatException If bytes is not valid.
+ */
+ UpdateMsg(byte[] bytes) throws DataFormatException
{
+ // Decode header
+ int pos = decodeHeader(MSG_TYPE_GENERIC_UPDATE, bytes);
+
+ /* Read the payload : all the remaining bytes but the terminating 0 */
+ int length = bytes.length - pos;
+ payload = new byte[length];
+ try
+ {
+ System.arraycopy(bytes, pos, payload, 0, length);
+ } catch (IndexOutOfBoundsException e)
+ {
+ throw new DataFormatException(e.getMessage());
+ } catch (ArrayStoreException e)
+ {
+ throw new DataFormatException(e.getMessage());
+ } catch (NullPointerException e)
+ {
+ throw new DataFormatException(e.getMessage());
+ }
}
/**
* Creates a new UpdateMsg with the given informations.
*
- * @param ctx The replication Context of the operation for which the
- * update message must be created,.
- * @param dn The DN of the entry on which the change
- * that caused the creation of this object happened
+ * @param changeNumber The ChangeNumber associated with the change
+ * encoded in this message.
+ * @param payload The payload that must be encoded in this message.
*/
- public UpdateMsg(OperationContext ctx, String dn)
+ public UpdateMsg(ChangeNumber changeNumber, byte[] payload)
{
+ this.payload = payload;
this.protocolVersion = ProtocolVersion.getCurrentVersion();
- this.changeNumber = ctx.getChangeNumber();
- this.uniqueId = ctx.getEntryUid();
- this.dn = dn;
- }
-
- /**
- * Creates a new UpdateMessage with the given informations.
- *
- * @param cn The ChangeNumber of the operation for which the
- * UpdateMessage is created.
- * @param entryUUID The Unique identifier of the entry that is updated
- * by the operation for which the UpdateMessage is created.
- * @param dn The DN of the entry on which the change
- * that caused the creation of this object happened
- */
- public UpdateMsg(ChangeNumber cn, String entryUUID, String dn)
- {
- this.protocolVersion = ProtocolVersion.getCurrentVersion();
- this.changeNumber = cn;
- this.uniqueId = entryUUID;
- this.dn = dn;
- }
-
- /**
- * Generates an Update Message with the provided information.
- *
- * @param op The operation for which the message must be created.
- * @return The generated message.
- */
- public static UpdateMsg generateMsg(PostOperationOperation op)
- {
- UpdateMsg msg = null;
- switch (op.getOperationType())
- {
- case MODIFY :
- msg = new ModifyMsg((PostOperationModifyOperation) op);
- break;
-
- case ADD:
- msg = new AddMsg((PostOperationAddOperation) op);
- break;
-
- case DELETE :
- msg = new DeleteMsg((PostOperationDeleteOperation) op);
- break;
-
- case MODIFY_DN :
- msg = new ModifyDNMsg( (PostOperationModifyDNOperation) op);
- break;
- }
-
- return msg;
+ this.changeNumber = changeNumber;
}
/**
@@ -165,35 +130,6 @@
}
/**
- * Get the DN on which the operation happened.
- *
- * @return The DN on which the operations happened.
- */
- public String getDn()
- {
- return dn;
- }
-
- /**
- * Set the DN.
- * @param dn The dn that must now be used for this message.
- */
- public void setDn(String dn)
- {
- this.dn = dn;
- }
-
- /**
- * Get the Unique Identifier of the entry on which the operation happened.
- *
- * @return The Unique Identifier of the entry on which the operation happened.
- */
- public String getUniqueId()
- {
- return uniqueId;
- }
-
- /**
* Get a boolean indicating if the Update must be processed as an
* Asynchronous or as an assured replication.
*
@@ -250,96 +186,6 @@
return changeNumber.compareTo(msg.getChangeNumber());
}
- /**
- * Create and Operation from the message.
- *
- * @param conn connection to use when creating the message
- * @return the created Operation
- * @throws LDAPException In case of LDAP decoding exception.
- * @throws ASN1Exception In case of ASN1 decoding exception.
- * @throws DataFormatException In case of bad msg format.
- */
- public AbstractOperation createOperation(InternalClientConnection conn)
- throws LDAPException, ASN1Exception, DataFormatException
- {
- return createOperation(conn, dn);
- }
-
-
- /**
- * Create and Operation from the message using the provided DN.
- *
- * @param conn connection to use when creating the message.
- * @param newDn the DN to use when creating the operation.
- * @return the created Operation.
- * @throws LDAPException In case of LDAP decoding exception.
- * @throws ASN1Exception In case of ASN1 decoding exception.
- * @throws DataFormatException In case of bad msg format.
- */
- public abstract AbstractOperation createOperation(
- InternalClientConnection conn, String newDn)
- throws LDAPException, ASN1Exception, DataFormatException;
-
- /**
- * Encode the common header for all the UpdateMsg. This uses the current
- * protocol version.
- *
- * @param type the type of UpdateMsg to encode.
- * @param additionalLength additional length needed to encode the remaining
- * part of the UpdateMsg.
- * @return a byte array containing the common header and enough space to
- * encode the remaining bytes of the UpdateMsg as was specified
- * by the additionalLength.
- * (byte array length = common header length + additionalLength)
- * @throws UnsupportedEncodingException if UTF-8 is not supported.
- */
- public byte[] encodeHeader(byte type, int additionalLength)
- throws UnsupportedEncodingException
- {
- byte[] byteDn = dn.getBytes("UTF-8");
- byte[] changeNumberByte =
- this.getChangeNumber().toString().getBytes("UTF-8");
- byte[] byteEntryuuid = getUniqueId().getBytes("UTF-8");
-
- /* The message header is stored in the form :
- * <operation type><protocol version><changenumber><dn><entryuuid><assured>
- * <assured mode> <safe data level>
- * the length of result byte array is therefore :
- * 1 + 1 + change number length + 1 + dn length + 1 + uuid length + 1 + 1
- * + 1 + 1 + additional_length
- */
- int length = 8 + changeNumberByte.length + byteDn.length
- + byteEntryuuid.length + additionalLength;
-
- byte[] encodedMsg = new byte[length];
-
- /* put the type of the operation */
- encodedMsg[0] = type;
-
- /* put the protocol version */
- encodedMsg[1] = (byte)ProtocolVersion.getCurrentVersion();
- int pos = 2;
-
- /* Put the ChangeNumber */
- pos = addByteArray(changeNumberByte, encodedMsg, pos);
-
- /* Put the DN and a terminating 0 */
- pos = addByteArray(byteDn, encodedMsg, pos);
-
- /* Put the entry uuid and a terminating 0 */
- pos = addByteArray(byteEntryuuid, encodedMsg, pos);
-
- /* Put the assured flag */
- encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
-
- /* Put the assured mode */
- encodedMsg[pos++] = assuredMode.getValue();
-
- /* Put the safe data level */
- encodedMsg[pos++] = safeDataLevel;
-
- return encodedMsg;
- }
/**
* {@inheritDoc}
@@ -353,226 +199,15 @@
{
return getBytes();
}
-
- // Supported older protocol versions
- switch (reqProtocolVersion)
+ else
{
- case ProtocolVersion.REPLICATION_PROTOCOL_V1:
- return getBytes_V1();
- default:
- // Unsupported requested version
- throw new UnsupportedEncodingException(getClass().getSimpleName() +
+ throw new UnsupportedEncodingException(getClass().getSimpleName() +
" PDU does not support requested protocol version serialization: " +
reqProtocolVersion);
}
}
/**
- * Get the byte array representation of this Message. This uses the version
- * 1 of the replication protocol (used for compatibility purpose).
- *
- * @return The byte array representation of this Message.
- *
- * @throws UnsupportedEncodingException When the encoding of the message
- * failed because the UTF-8 encoding is not supported.
- */
- public abstract byte[] getBytes_V1() throws UnsupportedEncodingException;
-
- /**
- * Encode the common header for all the UpdateMessage. This uses the version
- * 1 of the replication protocol (used for compatibility purpose).
- *
- * @param type the type of UpdateMessage to encode.
- * @param additionalLength additional length needed to encode the remaining
- * part of the UpdateMessage.
- * @return a byte array containing the common header and enough space to
- * encode the remaining bytes of the UpdateMessage as was specified
- * by the additionalLength.
- * (byte array length = common header length + additionalLength)
- * @throws UnsupportedEncodingException if UTF-8 is not supported.
- */
- public byte[] encodeHeader_V1(byte type, int additionalLength)
- throws UnsupportedEncodingException
- {
- byte[] byteDn = dn.getBytes("UTF-8");
- byte[] changeNumberByte =
- this.getChangeNumber().toString().getBytes("UTF-8");
- byte[] byteEntryuuid = getUniqueId().getBytes("UTF-8");
-
- /* The message header is stored in the form :
- * <operation type>changenumber><dn><assured><entryuuid><change>
- * the length of result byte array is therefore :
- * 1 + change number length + 1 + dn length + 1 + 1 +
- * uuid length + 1 + additional_length
- */
- int length = 5 + changeNumberByte.length + byteDn.length
- + byteEntryuuid.length + additionalLength;
-
- byte[] encodedMsg = new byte[length];
-
- /* put the type of the operation */
- encodedMsg[0] = type;
- int pos = 1;
-
- /* put the ChangeNumber */
- pos = addByteArray(changeNumberByte, encodedMsg, pos);
-
- /* put the assured information */
- encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
-
- /* put the DN and a terminating 0 */
- pos = addByteArray(byteDn, encodedMsg, pos);
-
- /* put the entry uuid and a terminating 0 */
- pos = addByteArray(byteEntryuuid, encodedMsg, pos);
-
- return encodedMsg;
- }
-
- /**
- * Decode the Header part of this Update Message, and check its type.
- *
- * @param types The allowed types of this Update Message.
- * @param encodedMsg the encoded form of the UpdateMsg.
- * @return the position at which the remaining part of the message starts.
- * @throws DataFormatException if the encodedMsg does not contain a valid
- * common header.
- */
- public int decodeHeader(byte[] types, byte[] encodedMsg)
- throws DataFormatException
- {
- /* The message header is stored in the form :
- * <operation type><protocol version><changenumber><dn><entryuuid><assured>
- * <assured mode> <safe data level>
- */
-
- /* first byte is the type */
- boolean foundMatchingType = false;
- for (int i = 0; i < types.length; i++)
- {
- if (types[i] == encodedMsg[0])
- {
- foundMatchingType = true;
- break;
- }
- }
- if (!foundMatchingType)
- throw new DataFormatException("byte[] is not a valid update msg: "
- + encodedMsg[0]);
-
- /*
- * For older protocol version PDUs, decode the matching version header
- * instead.
- */
- if ((encodedMsg[0] == MSG_TYPE_ADD_V1) ||
- (encodedMsg[0] == MSG_TYPE_DELETE_V1) ||
- (encodedMsg[0] == MSG_TYPE_MODIFYDN_V1) ||
- (encodedMsg[0] == MSG_TYPE_MODIFY_V1))
- {
- return decodeHeader_V1(encodedMsg);
- }
-
- /* read the protocol version */
- protocolVersion = (short)encodedMsg[1];
-
- try
- {
- /* Read the changeNumber */
- int pos = 2;
- int length = getNextLength(encodedMsg, pos);
- String changenumberStr = new String(encodedMsg, pos, length, "UTF-8");
- pos += length + 1;
- changeNumber = new ChangeNumber(changenumberStr);
-
- /* Read the dn */
- length = getNextLength(encodedMsg, pos);
- dn = new String(encodedMsg, pos, length, "UTF-8");
- pos += length + 1;
-
- /* Read the entryuuid */
- length = getNextLength(encodedMsg, pos);
- uniqueId = new String(encodedMsg, pos, length, "UTF-8");
- pos += length + 1;
-
- /* Read the assured information */
- if (encodedMsg[pos++] == 1)
- assuredFlag = true;
- else
- assuredFlag = false;
-
- /* Read the assured mode */
- assuredMode = AssuredMode.valueOf(encodedMsg[pos++]);
-
- /* Read the safe data level */
- safeDataLevel = encodedMsg[pos++];
-
- return pos;
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- } catch (IllegalArgumentException e)
- {
- throw new DataFormatException(e.getMessage());
- }
- }
-
- /**
- * Decode the Header part of this Update Message, and check its type. This
- * uses the version 1 of the replication protocol (used for compatibility
- * purpose).
- *
- * @param encodedMsg the encoded form of the UpdateMessage.
- * @return the position at which the remaining part of the message starts.
- * @throws DataFormatException if the encodedMsg does not contain a valid
- * common header.
- */
- public int decodeHeader_V1(byte[] encodedMsg)
- throws DataFormatException
- {
- if ((encodedMsg[0] != MSG_TYPE_ADD_V1) &&
- (encodedMsg[0] != MSG_TYPE_DELETE_V1) &&
- (encodedMsg[0] != MSG_TYPE_MODIFYDN_V1) &&
- (encodedMsg[0] != MSG_TYPE_MODIFY_V1))
- throw new DataFormatException("byte[] is not a valid update msg: expected"
- + " a V1 PDU, received: " + encodedMsg[0]);
-
- // Force version to V1 (other new parameters take their default values
- // (assured stuff...))
- protocolVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
-
- try
- {
- /* read the changeNumber */
- int pos = 1;
- int length = getNextLength(encodedMsg, pos);
- String changenumberStr = new String(encodedMsg, pos, length, "UTF-8");
- pos += length + 1;
- changeNumber = new ChangeNumber(changenumberStr);
-
- /* read the assured information */
- if (encodedMsg[pos++] == 1)
- assuredFlag = true;
- else
- assuredFlag = false;
-
- /* read the dn */
- length = getNextLength(encodedMsg, pos);
- dn = new String(encodedMsg, pos, length, "UTF-8");
- pos += length + 1;
-
- /* read the entryuuid */
- length = getNextLength(encodedMsg, pos);
- uniqueId = new String(encodedMsg, pos, length, "UTF-8");
- pos += length + 1;
-
- return pos;
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- }
-
- /**
* Get the assured mode in this message.
* @return The assured mode in this message
*/
@@ -626,5 +261,144 @@
*
* @return The number of bytes used by this message.
*/
- public abstract int size();
+ public int size()
+ {
+ return 10 + payload.length;
+ }
+
+ /**
+ * Encode the common header for all the UpdateMsg. This uses the current
+ * protocol version.
+ *
+ * @param type the type of UpdateMsg to encode.
+ * @param additionalLength additional length needed to encode the remaining
+ * part of the UpdateMsg.
+ * @return a byte array containing the common header and enough space to
+ * encode the remaining bytes of the UpdateMsg as was specified
+ * by the additionalLength.
+ * (byte array length = common header length + additionalLength)
+ * @throws UnsupportedEncodingException if UTF-8 is not supported.
+ */
+ protected byte[] encodeHeader(byte type, int additionalLength)
+ throws UnsupportedEncodingException
+ {
+ byte[] changeNumberByte =
+ this.getChangeNumber().toString().getBytes("UTF-8");
+
+ /* The message header is stored in the form :
+ * <operation type><protocol version><changenumber><assured>
+ * <assured mode> <safe data level>
+ * the length of result byte array is therefore :
+ * 1 + 1 + change number length + 1 + 1
+ * + 1 + 1 + additional_length
+ */
+ int length = 6 + changeNumberByte.length + additionalLength;
+
+ byte[] encodedMsg = new byte[length];
+
+ /* put the type of the operation */
+ encodedMsg[0] = type;
+
+ /* put the protocol version */
+ encodedMsg[1] = (byte)ProtocolVersion.getCurrentVersion();
+ int pos = 2;
+
+ /* Put the ChangeNumber */
+ pos = addByteArray(changeNumberByte, encodedMsg, pos);
+
+ /* Put the assured flag */
+ encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
+
+ /* Put the assured mode */
+ encodedMsg[pos++] = assuredMode.getValue();
+
+ /* Put the safe data level */
+ encodedMsg[pos++] = safeDataLevel;
+
+ return encodedMsg;
+ }
+
+ /**
+ * Decode the Header part of this Update Message, and check its type.
+ *
+ * @param type The allowed type of this Update Message.
+ * @param encodedMsg the encoded form of the UpdateMsg.
+ * @return the position at which the remaining part of the message starts.
+ * @throws DataFormatException if the encodedMsg does not contain a valid
+ * common header.
+ */
+ protected int decodeHeader(byte type, byte[] encodedMsg)
+ throws DataFormatException
+ {
+ /* The message header is stored in the form :
+ * <operation type><protocol version><changenumber><assured>
+ * <assured mode> <safe data level>
+ */
+ if (!(type == encodedMsg[0]))
+ throw new DataFormatException("byte[] is not a valid update msg: "
+ + encodedMsg[0]);
+
+ /* read the protocol version */
+ protocolVersion = (short)encodedMsg[1];
+
+ try
+ {
+ /* Read the changeNumber */
+ int pos = 2;
+ int length = getNextLength(encodedMsg, pos);
+ String changenumberStr = new String(encodedMsg, pos, length, "UTF-8");
+ pos += length + 1;
+ changeNumber = new ChangeNumber(changenumberStr);
+
+ /* Read the assured information */
+ if (encodedMsg[pos++] == 1)
+ assuredFlag = true;
+ else
+ assuredFlag = false;
+
+ /* Read the assured mode */
+ assuredMode = AssuredMode.valueOf(encodedMsg[pos++]);
+
+ /* Read the safe data level */
+ safeDataLevel = encodedMsg[pos++];
+
+ return pos;
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ } catch (IllegalArgumentException e)
+ {
+ throw new DataFormatException(e.getMessage());
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes() throws UnsupportedEncodingException
+ {
+ /* Encode the header in a byte[] large enough to also contain the payload */
+ byte [] resultByteArray =
+ encodeHeader(MSG_TYPE_GENERIC_UPDATE, payload.length);
+
+ int pos = resultByteArray.length - payload.length;
+
+ /* Add the payload */
+ for (int i=0; i<payload.length; i++,pos++)
+ {
+ resultByteArray[pos] = payload[i];
+ }
+ return resultByteArray;
+ }
+
+ /**
+ * Get the payload of the UpdateMsg.
+ *
+ * @return The payload of the UpdateMsg.
+ */
+ public byte[] getPayload()
+ {
+ return payload;
+ }
}
--
Gitblit v1.10.0