From 88cfe5045d77d433ce02b0ef10ee84c9d4fb15e2 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 23 May 2014 15:17:15 +0000
Subject: [PATCH] (CR-3599) Convert all protocols message to use ByteArrayBuilder + ByteArrayScanner
---
opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java | 90
opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java | 194 -
opends/src/server/org/opends/server/replication/protocol/StartMsg.java | 224 -
opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java | 53
opends/src/server/org/opends/server/replication/protocol/AckMsg.java | 137 -
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java | 364 --
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 138 -
opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java | 207 -
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java | 64
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java | 193 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java | 7
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java | 201 -
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java | 199 -
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java | 86
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java | 76
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java | 16
opends/src/server/org/opends/server/replication/protocol/DoneMsg.java | 68
opends/src/server/org/opends/server/replication/protocol/EntryMsg.java | 128
opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java | 255 -
opends/src/server/org/opends/server/replication/protocol/WindowMsg.java | 69
opends/src/server/org/opends/server/replication/common/ServerState.java | 130 -
opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java | 143 -
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java | 344 --
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java | 228 -
opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java | 131
opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java | 163 -
opends/src/server/org/opends/server/replication/protocol/AddMsg.java | 240 -
opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java | 114 +
opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java | 61
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java | 388 +--
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java | 166 -
opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java | 151 +
opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java | 111
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java | 387 ---
/dev/null | 80
opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java | 21
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java | 7
opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java | 127
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 13
opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java | 156 -
40 files changed, 1,682 insertions(+), 4,248 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 81cb59d..73d9714 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -22,17 +22,14 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.common;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.*;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.protocol.ProtocolVersion;
@@ -75,70 +72,6 @@
serverIdToCSN.clear();
}
-
- /**
- * Creates a new ServerState object from its encoded form.
- *
- * @param in The byte array containing the encoded ServerState form.
- * @param pos The position in the byte array where the encoded ServerState
- * starts.
- * @param endpos The position in the byte array where the encoded ServerState
- * ends.
- * @throws DataFormatException If the encoded form was not correct.
- */
- public ServerState(byte[] in, int pos, int endpos) throws DataFormatException
- {
- try
- {
- while (endpos > pos)
- {
- // FIXME JNR: why store the serverId separately from the CSN since the
- // CSN already contains the serverId?
-
- // read the ServerId
- int length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- int serverId = Integer.valueOf(serverIdString);
- pos += length +1;
-
- // read the CSN
- length = getNextLength(in, pos);
- String csnString = new String(in, pos, length, "UTF-8");
- CSN csn = new CSN(csnString);
- pos += length +1;
-
- // Add the serverId
- serverIdToCSN.put(serverId, csn);
- }
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- }
-
- /**
- * Get the length of the next String encoded in the in byte array.
- * This method is used to cut the different parts (serverIds, CSN)
- * of a server state.
- *
- * @param in the byte array where to calculate the string.
- * @param pos the position where to start from in the byte array.
- * @return the length of the next string.
- * @throws DataFormatException If the byte array does not end with null.
- */
- private int getNextLength(byte[] in, int pos) throws DataFormatException
- {
- int offset = pos;
- int length = 0;
- while (in[offset++] != 0)
- {
- if (offset >= in.length)
- throw new DataFormatException("byte[] is not a valid server state");
- length++;
- }
- return length;
- }
-
/**
* Forward update the Server State with a CSN. The provided CSN will be put on
* the current object only if it is newer than the existing CSN for the same
@@ -151,7 +84,9 @@
public boolean update(CSN csn)
{
if (csn == null)
+ {
return false;
+ }
saved = false;
@@ -191,7 +126,9 @@
public boolean update(ServerState serverState)
{
if (serverState == null)
+ {
return false;
+ }
boolean updated = false;
for (CSN csn : serverState.serverIdToCSN.values())
@@ -215,7 +152,9 @@
public boolean removeCSN(CSN expectedCSN)
{
if (expectedCSN == null)
+ {
return false;
+ }
if (serverIdToCSN.remove(expectedCSN.getServerId(), expectedCSN))
{
@@ -335,63 +274,18 @@
}
/**
- * Add the tail into resultByteArray at position pos.
- */
- private int addByteArray(byte[] tail, byte[] resultByteArray, int pos)
- {
- for (int i=0; i<tail.length; i++,pos++)
- {
- resultByteArray[pos] = tail[i];
- }
- resultByteArray[pos++] = 0;
- return pos;
- }
-
- /**
- * Encode this ServerState object and return its byte array representation.
+ * Returns a copy of this ServerState's content as a Map of serverId => CSN.
*
- * @return a byte array with an encoded representation of this object.
- * @throws UnsupportedEncodingException if UTF8 is not supported by the JVM.
+ * @return a copy of this ServerState's content as a Map of serverId => CSN.
*/
- public byte[] getBytes() throws UnsupportedEncodingException
+ public Map<Integer, CSN> getServerIdToCSNMap()
{
// copy to protect from concurrent updates
// that could change the number of elements in the Map
- final Map<Integer, CSN> copy = new HashMap<Integer, CSN>(serverIdToCSN);
-
- final int size = copy.size();
- List<String> idList = new ArrayList<String>(size);
- List<String> csnList = new ArrayList<String>(size);
- // calculate the total length needed to allocate byte array
- int length = 0;
- for (Entry<Integer, CSN> entry : copy.entrySet())
- {
- // serverId is useless, see comment in ServerState ctor
- final String serverIdStr = String.valueOf(entry.getKey());
- idList.add(serverIdStr);
- length += serverIdStr.length() + 1;
-
- final String csnStr = entry.getValue().toString();
- csnList.add(csnStr);
- length += csnStr.length() + 1;
- }
- byte[] result = new byte[length];
-
- // write the server state into the byte array
- int pos = 0;
- for (int i = 0; i < size; i++)
- {
- String str = idList.get(i);
- pos = addByteArray(str.getBytes("UTF-8"), result, pos);
- str = csnList.get(i);
- pos = addByteArray(str.getBytes("UTF-8"), result, pos);
- }
- return result;
+ return new HashMap<Integer, CSN>(serverIdToCSN);
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public Iterator<CSN> iterator()
{
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index ba1e689..d3ec86d 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -26,7 +26,10 @@
*/
package org.opends.server.replication.plugin;
-import java.io.*;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringReader;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -59,7 +62,10 @@
import org.opends.server.protocols.ldap.LDAPControl;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
-import org.opends.server.replication.common.*;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
@@ -2063,9 +2069,6 @@
{
msg.encode();
pendingChanges.commitAndPushCommittedChanges(curCSN, msg);
- } catch (UnsupportedEncodingException e)
- {
- // will be caught at publish time.
}
catch (NoSuchElementException e)
{
diff --git a/opends/src/server/org/opends/server/replication/protocol/AckMsg.java b/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
index ea73899..853d589 100644
--- a/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -22,13 +22,10 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.DataFormatException;
@@ -60,7 +57,7 @@
public class AckMsg extends ReplicationMsg
{
/** CSN of the update that was acked. */
- private CSN csn;
+ private final CSN csn;
/**
* Did some servers go in timeout when the matching update (corresponding to
@@ -159,50 +156,28 @@
* @throws DataFormatException If in does not contain a properly encoded
* AckMsg.
*/
- public AckMsg(byte[] in) throws DataFormatException
+ AckMsg(byte[] in) throws DataFormatException
{
- try
+ /*
+ * The message is stored in the form:
+ * <operation type><CSN><has timeout><has degraded><has replay
+ * error><failed server ids>
+ */
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_ACK)
{
- /*
- * The message is stored in the form:
- * <operation type><CSN><has timeout><has degraded><has replay
- * error><failed server ids>
- */
+ throw new DataFormatException("byte[] is not a valid modify msg");
+ }
- // First byte is the type
- if (in[0] != MSG_TYPE_ACK)
- {
- throw new DataFormatException("byte[] is not a valid modify msg");
- }
- int pos = 1;
+ csn = scanner.nextCSNUTF8();
+ hasTimeout = scanner.nextBoolean();
+ hasWrongStatus = scanner.nextBoolean();
+ hasReplayError = scanner.nextBoolean();
- // Read the CSN
- int length = getNextLength(in, pos);
- String csnStr = new String(in, pos, length, "UTF-8");
- csn = new CSN(csnStr);
- pos += length + 1;
-
- // Read the hasTimeout flag
- hasTimeout = in[pos++] == 1;
-
- // Read the hasWrongStatus flag
- hasWrongStatus = in[pos++] == 1;
-
- // Read the hasReplayError flag
- hasReplayError = in[pos++] == 1;
-
- // Read the list of failed server ids
- while (pos < in.length)
- {
- length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- Integer serverId = Integer.valueOf(serverIdString);
- failedServers.add(serverId);
- pos += length + 1;
- }
- } catch (UnsupportedEncodingException e)
+ while (!scanner.isEmpty())
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ failedServers.add(scanner.nextIntUTF8());
}
}
@@ -216,53 +191,26 @@
return csn;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
- try
+ /*
+ * The message is stored in the form:
+ * <operation type><CSN><has timeout><has degraded><has replay
+ * error><failed server ids>
+ */
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_ACK);
+ builder.appendUTF8(csn);
+ builder.append(hasTimeout);
+ builder.append(hasWrongStatus);
+ builder.append(hasReplayError);
+ for (int serverId : failedServers)
{
- /*
- * The message is stored in the form:
- * <operation type><CSN><has timeout><has degraded><has replay
- * error><failed server ids>
- */
-
- ByteArrayOutputStream oStream = new ByteArrayOutputStream(200);
-
- // Put the type of the operation
- oStream.write(MSG_TYPE_ACK);
-
- // Put the CSN
- byte[] csnByte = csn.toString().getBytes("UTF-8");
- oStream.write(csnByte);
- oStream.write(0);
-
- // Put the hasTimeout flag
- oStream.write(hasTimeout ? 1 : 0);
-
- // Put the hasWrongStatus flag
- oStream.write(hasWrongStatus ? 1 : 0);
-
- // Put the hasReplayError flag
- oStream.write(hasReplayError ? 1 : 0);
-
- // Put the list of server ids
- for (Integer sid : failedServers)
- {
- byte[] byteServerId = String.valueOf(sid).getBytes("UTF-8");
- oStream.write(byteServerId);
- oStream.write(0);
- }
-
- return oStream.toByteArray();
- } catch (IOException e)
- {
- // never happens
- return null;
+ builder.appendUTF8(serverId);
}
+ return builder.toByteArray();
}
/**
@@ -307,21 +255,8 @@
*/
public String errorsToString()
{
- String idList;
- if (failedServers.size() > 0)
- {
- idList = "[";
- int size = failedServers.size();
- for (int i=0 ; i<size ; i++) {
- idList += failedServers.get(i);
- if ( i != (size-1) )
- idList += ", ";
- }
- idList += "]";
- } else
- {
- idList="none";
- }
+ final String idList =
+ !failedServers.isEmpty() ? failedServers.toString() : "none";
return "hasTimeout: " + (hasTimeout ? "yes" : "no") + ", " +
"hasWrongStatus: " + (hasWrongStatus ? "yes" : "no") + ", " +
diff --git a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
index 4e42a6f..fea3de7 100644
--- a/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -26,7 +26,6 @@
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -64,7 +63,7 @@
* Creates a new AddMessage.
* @param op the operation to use when creating the message
*/
- public AddMsg(PostOperationAddOperation op)
+ AddMsg(PostOperationAddOperation op)
{
super((AddContext) op.getAttachment(SYNCHROCONTEXT), op.getEntryDN());
@@ -143,22 +142,19 @@
*
* @param in The byte[] from which the operation must be read.
* @throws DataFormatException The input byte[] is not a valid AddMsg
- * @throws UnsupportedEncodingException If UTF8 is not supported by the jvm
*/
- public AddMsg(byte[] in) throws DataFormatException,
- UnsupportedEncodingException
+ public AddMsg(byte[] in) throws DataFormatException
{
- byte[] allowedPduTypes = new byte[2];
- allowedPduTypes[0] = MSG_TYPE_ADD;
- allowedPduTypes[1] = MSG_TYPE_ADD_V1;
- int pos = decodeHeader(allowedPduTypes, in);
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ decodeHeader(scanner, MSG_TYPE_ADD, MSG_TYPE_ADD_V1);
- // protocol version has been read as part of the header
if (protocolVersion <= 3)
- decodeBody_V123(in, pos);
+ {
+ decodeBody_V123(scanner);
+ }
else
{
- decodeBody_V4(in, pos);
+ decodeBody_V4(scanner);
}
if (protocolVersion==ProtocolVersion.getCurrentVersion())
{
@@ -189,122 +185,37 @@
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V1() throws UnsupportedEncodingException
+ public byte[] getBytes_V1()
{
- int bodyLength = encodedAttributes.length;
- byte[] byteParentId = null;
- if (parentEntryUUID != null)
- {
- byteParentId = parentEntryUUID.getBytes("UTF-8");
- bodyLength += byteParentId.length + 1;
- }
- else
- {
- bodyLength += 1;
- }
-
- /* encode the header in a byte[] large enough to also contain the mods */
- byte [] resultByteArray = encodeHeader_V1(MSG_TYPE_ADD_V1, bodyLength);
-
- int pos = resultByteArray.length - bodyLength;
-
- if (byteParentId != null)
- pos = addByteArray(byteParentId, resultByteArray, pos);
- else
- resultByteArray[pos++] = 0;
-
- /* put the attributes */
- for (int i=0; i<encodedAttributes.length; i++,pos++)
- {
- resultByteArray[pos] = encodedAttributes[i];
- }
- return resultByteArray;
+ final ByteArrayBuilder builder = encodeHeader_V1(MSG_TYPE_ADD_V1);
+ builder.append(parentEntryUUID);
+ builder.append(encodedAttributes);
+ return builder.toByteArray();
}
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V23() throws UnsupportedEncodingException
+ public byte[] getBytes_V23()
{
- // Put together the different encoded pieces
- int bodyLength = encodedAttributes.length;
-
- // Compute the total length of the body
- byte[] byteParentId = null;
- if (parentEntryUUID != null)
- {
- // Encode parentID now to get the length of the encoded bytes
- byteParentId = parentEntryUUID.getBytes("UTF-8");
- bodyLength += byteParentId.length + 1;
- }
- else
- {
- bodyLength += 1;
- }
-
- /* encode the header in a byte[] large enough to also contain the mods */
- byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD, bodyLength,
- ProtocolVersion.REPLICATION_PROTOCOL_V3);
-
- int pos = resultByteArray.length - bodyLength;
-
- if (byteParentId != null)
- pos = addByteArray(byteParentId, resultByteArray, pos);
- else
- resultByteArray[pos++] = 0;
-
- /* put the attributes */
- for (int i=0; i<encodedAttributes.length; i++,pos++)
- {
- resultByteArray[pos] = encodedAttributes[i];
- }
- return resultByteArray;
+ final ByteArrayBuilder builder =
+ encodeHeader(MSG_TYPE_ADD, ProtocolVersion.REPLICATION_PROTOCOL_V3);
+ builder.append(parentEntryUUID);
+ builder.append(encodedAttributes);
+ return builder.toByteArray();
}
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V45(short reqProtocolVersion)
- throws UnsupportedEncodingException
+ public byte[] getBytes_V45(short protocolVersion)
{
- // Put together the different encoded pieces
- int bodyLength = 0;
-
- // Compute the total length of the body
- byte[] byteParentId = null;
- if (parentEntryUUID != null)
- {
- // Encode parentID now to get the length of the encoded bytes
- byteParentId = parentEntryUUID.getBytes("UTF-8");
- bodyLength += byteParentId.length + 1;
- }
- else
- {
- bodyLength += 1;
- }
-
- byte[] byteAttrLen =
- String.valueOf(encodedAttributes.length).getBytes("UTF-8");
- bodyLength += byteAttrLen.length + 1;
- bodyLength += encodedAttributes.length + 1;
-
- byte[] byteEntryAttrLen =
- String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
- bodyLength += byteEntryAttrLen.length + 1;
- bodyLength += encodedEclIncludes.length + 1;
-
- /* encode the header in a byte[] large enough to also contain the mods */
- byte [] encodedMsg = encodeHeader(MSG_TYPE_ADD, bodyLength,
- reqProtocolVersion);
-
- int pos = encodedMsg.length - bodyLength;
- if (byteParentId != null)
- pos = addByteArray(byteParentId, encodedMsg, pos);
- else
- encodedMsg[pos++] = 0;
- pos = addByteArray(byteAttrLen, encodedMsg, pos);
- pos = addByteArray(encodedAttributes, encodedMsg, pos);
- pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
- pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
- return encodedMsg;
+ final ByteArrayBuilder builder =
+ encodeHeader(MSG_TYPE_ADD, protocolVersion);
+ builder.append(parentEntryUUID);
+ builder.appendUTF8(encodedAttributes.length);
+ builder.appendZeroTerminated(encodedAttributes);
+ builder.appendUTF8(encodedEclIncludes.length);
+ builder.appendZeroTerminated(encodedEclIncludes);
+ return builder.toByteArray();
}
private byte[] encodeAttributes(
@@ -368,11 +279,17 @@
new LDAPAttribute(objectClass).write(writer);
for (Attribute a : userAttributes)
+ {
new LDAPAttribute(a).write(writer);
+ }
if (operationalAttributes != null)
+ {
for (Attribute a : operationalAttributes)
+ {
new LDAPAttribute(a).write(writer);
+ }
+ }
}
catch(Exception e)
{
@@ -385,89 +302,24 @@
// Msg decoding
// ============
- private void decodeBody_V123(byte[] in, int pos)
- throws DataFormatException, UnsupportedEncodingException
+ private void decodeBody_V123(ByteArrayScanner scanner)
+ throws DataFormatException
{
- // read the parent unique Id
- int length = getNextLength(in, pos);
- if (length != 0)
- {
- parentEntryUUID = new String(in, pos, length, "UTF-8");
- pos += length + 1;
- }
- else
- {
- parentEntryUUID = null;
- pos += 1;
- }
-
- // Read/Don't decode attributes : all the remaining bytes
- encodedAttributes = new byte[in.length-pos];
- int i =0;
- while (pos<in.length)
- {
- encodedAttributes[i++] = in[pos++];
- }
+ parentEntryUUID = scanner.nextString();
+ encodedAttributes = scanner.remainingBytes();
}
- private void decodeBody_V4(byte[] in, int pos)
- throws DataFormatException, UnsupportedEncodingException
+ private void decodeBody_V4(ByteArrayScanner scanner)
+ throws DataFormatException
{
- // read the parent unique Id
- int length = getNextLength(in, pos);
- if (length != 0)
- {
- parentEntryUUID = new String(in, pos, length, "UTF-8");
- pos += length + 1;
- }
- else
- {
- parentEntryUUID = null;
- pos += 1;
- }
+ parentEntryUUID = scanner.nextString();
- // Read attr len
- length = getNextLength(in, pos);
- int attrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
- pos += length + 1;
+ final int attrLen = scanner.nextIntUTF8();
+ encodedAttributes = scanner.nextByteArray(attrLen);
+ scanner.skipZeroSeparator();
- // Read/Don't decode attributes
- this.encodedAttributes = new byte[attrLen];
- try
- {
- System.arraycopy(in, pos, encodedAttributes, 0, attrLen);
- } catch (IndexOutOfBoundsException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (ArrayStoreException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (NullPointerException e)
- {
- throw new DataFormatException(e.getMessage());
- }
- pos += attrLen + 1;
-
- // Read ecl attr len
- length = getNextLength(in, pos);
- int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
- pos += length + 1;
-
- // Read/Don't decode entry attributes
- encodedEclIncludes = new byte[eclAttrLen];
- try
- {
- System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
- } catch (IndexOutOfBoundsException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (ArrayStoreException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (NullPointerException e)
- {
- throw new DataFormatException(e.getMessage());
- }
+ final int eclAttrLen = scanner.nextIntUTF8();
+ encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
}
/** {@inheritDoc} */
diff --git a/opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java b/opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java
index 9431c35..faef4bf 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java
@@ -26,9 +26,15 @@
import java.io.UnsupportedEncodingException;
import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.opends.server.protocols.asn1.ASN1;
+import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteStringBuilder;
+import org.opends.server.types.DN;
/**
* Byte array builder class encodes data into byte arrays to send messages over
@@ -42,8 +48,6 @@
public class ByteArrayBuilder
{
- /** This is the null byte, also known as zero byte. */
- public static final byte NULL_BYTE = 0;
private final ByteStringBuilder builder;
/**
@@ -51,7 +55,7 @@
*/
public ByteArrayBuilder()
{
- builder = new ByteStringBuilder();
+ builder = new ByteStringBuilder(256);
}
/**
@@ -165,7 +169,8 @@
*/
public ByteArrayBuilder appendStrings(Collection<String> col)
{
- append(col.size());
+ //append(int) would have been safer, but byte is compatible with legacy code
+ append((byte) col.size());
for (String s : col)
{
append(s);
@@ -174,23 +179,28 @@
}
/**
- * Append a String to this ByteArrayBuilder.
+ * Append a String with a zero separator to this ByteArrayBuilder,
+ * or only the zero separator if the string is null
+ * or if the string length is zero.
*
* @param s
- * the String to append.
+ * the String to append. Can be null.
* @return this ByteArrayBuilder
*/
public ByteArrayBuilder append(String s)
{
try
{
- append(s.getBytes("UTF-8"));
+ if (s != null && s.length() > 0)
+ {
+ append(s.getBytes("UTF-8"));
+ }
+ return appendZeroSeparator();
}
catch (UnsupportedEncodingException e)
{
throw new RuntimeException("Should never happen", e);
}
- return this;
}
/**
@@ -220,17 +230,93 @@
return this;
}
- private ByteArrayBuilder append(byte[] sBytes)
+ /**
+ * Append a DN to this ByteArrayBuilder by converting it to a String then
+ * encoding that string to a UTF-8 byte array.
+ *
+ * @param dn
+ * the DN to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder append(DN dn)
{
- for (byte b : sBytes)
- {
- append(b);
- }
- append((byte) 0); // zero separator
+ append(dn.toString());
return this;
}
/**
+ * Append all the bytes from the byte array to this ByteArrayBuilder.
+ *
+ * @param bytes
+ * the byte array to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder append(byte[] bytes)
+ {
+ builder.append(bytes);
+ return this;
+ }
+
+ /**
+ * Append all the bytes from the byte array to this ByteArrayBuilder
+ * and then append a final zero byte separator for compatibility
+ * with legacy implementations.
+ *
+ * @param bytes
+ * the byte array to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder appendZeroTerminated(byte[] bytes)
+ {
+ builder.append(bytes);
+ return appendZeroSeparator();
+ }
+
+ private ByteArrayBuilder appendZeroSeparator()
+ {
+ builder.append((byte) 0);
+ return this;
+ }
+
+ /**
+ * Append the byte representation of a ServerState to this ByteArrayBuilder
+ * and then append a final zero byte separator.
+ * <p>
+ * Caution: ServerState MUST be the last field. Because ServerState can
+ * contain null character (string termination of serverId string ..) it cannot
+ * be decoded using {@link ByteArrayScanner#nextString()} like the other
+ * fields. The only way is to rely on the end of the input buffer: and that
+ * forces the ServerState to be the last field. This should be changed if we
+ * want to have more than one ServerState field.
+ *
+ * @param serverState
+ * the ServerState to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder append(ServerState serverState)
+ {
+ final Map<Integer, CSN> serverIdToCSN = serverState.getServerIdToCSNMap();
+ for (Entry<Integer, CSN> entry : serverIdToCSN.entrySet())
+ {
+ // FIXME JNR: why append the serverId in addition to the CSN
+ // since the CSN already contains the serverId?
+ appendUTF8(entry.getKey()); // serverId
+ appendUTF8(entry.getValue()); // CSN
+ }
+ return appendZeroSeparator(); // stupid legacy zero separator
+ }
+
+ /**
+ * Returns a new ASN1Writer that will append bytes to this ByteArrayBuilder.
+ *
+ * @return a new ASN1Writer that will append bytes to this ByteArrayBuilder.
+ */
+ public ASN1Writer getASN1Writer()
+ {
+ return ASN1.getWriter(builder);
+ }
+
+ /**
* Converts the content of this ByteStringBuilder to a byte array.
*
* @return the content of this ByteStringBuilder converted to a byte array.
diff --git a/opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java b/opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java
index 374bf0b..f0a13ec 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java
@@ -27,9 +27,14 @@
import java.util.Collection;
import java.util.zip.DataFormatException;
+import org.opends.server.protocols.asn1.ASN1;
+import org.opends.server.protocols.asn1.ASN1Reader;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
/**
* Byte array scanner class helps decode data from byte arrays received via
@@ -44,6 +49,7 @@
{
private final ByteSequenceReader bytes;
+ private final byte[] byteArray;
/**
* Builds a ByteArrayScanner object that will read from the supplied byte
@@ -55,6 +61,7 @@
public ByteArrayScanner(byte[] bytes)
{
this.bytes = ByteString.wrap(bytes).asReader();
+ this.byteArray = bytes;
}
/**
@@ -172,7 +179,7 @@
/**
* Reads the next UTF8-encoded string.
*
- * @return the next UTF8-encoded string.
+ * @return the next UTF8-encoded string or null if the string length is zero
* @throws DataFormatException
* if no more data can be read from the input
*/
@@ -180,9 +187,15 @@
{
try
{
- final String s = bytes.getString(findZeroSeparator());
- bytes.skip(1); // skip the zero separator
- return s;
+ final int offset = findZeroSeparator();
+ if (offset > 0)
+ {
+ final String s = bytes.getString(offset);
+ skipZeroSeparator();
+ return s;
+ }
+ skipZeroSeparator();
+ return null;
}
catch (IndexOutOfBoundsException e)
{
@@ -220,7 +233,8 @@
public <TCol extends Collection<String>> TCol nextStrings(TCol output)
throws DataFormatException
{
- final int colSize = nextInt();
+ // nextInt() would have been safer, but byte is compatible with legacy code.
+ final int colSize = nextByte();
for (int i = 0; i < colSize; i++)
{
output.add(nextString());
@@ -269,6 +283,127 @@
}
/**
+ * Reads the next DN.
+ *
+ * @return the next DN.
+ * @throws DataFormatException
+ * if DN was incorrectly encoded or no more data can be read from
+ * the input
+ */
+ public DN nextDN() throws DataFormatException
+ {
+ try
+ {
+ return DN.decode(nextString());
+ }
+ catch (DirectoryException e)
+ {
+ throw new DataFormatException(e.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Return a new byte array containing all remaining bytes in this
+ * ByteArrayScanner.
+ *
+ * @return new byte array containing all remaining bytes
+ */
+ public byte[] remainingBytes()
+ {
+ final int length = byteArray.length - bytes.position();
+ return nextByteArray(length);
+ }
+
+ /**
+ * Return a new byte array containing all remaining bytes in this
+ * ByteArrayScanner bar the last one which is a zero terminated byte
+ * (compatible with legacy code).
+ *
+ * @return new byte array containing all remaining bytes bar the last one
+ */
+ public byte[] remainingBytesZeroTerminated()
+ {
+ /* do not copy stupid legacy zero separator */
+ final int length = byteArray.length - (bytes.position() + 1);
+ final byte[] result = nextByteArray(length);
+ bytes.skip(1); // ignore last (supposedly) zero byte
+ return result;
+ }
+
+ /**
+ * Return a new byte array containing the requested number of bytes.
+ *
+ * @param length
+ * the number of bytes to be read and copied to the new byte array.
+ * @return new byte array containing the requested number of bytes.
+ */
+ public byte[] nextByteArray(final int length)
+ {
+ final byte[] result = new byte[length];
+ System.arraycopy(byteArray, bytes.position(), result, 0, length);
+ bytes.skip(length);
+ return result;
+ }
+
+ /**
+ * Reads the next ServerState.
+ * <p>
+ * Caution: ServerState MUST be the last field (see
+ * {@link ByteArrayBuilder#append(ServerState)} javadoc).
+ *
+ * @return the next ServerState.
+ * @throws DataFormatException
+ * if ServerState was incorrectly encoded or no more data can be
+ * read from the input
+ * @see ByteArrayBuilder#append(ServerState)
+ */
+ public ServerState nextServerState() throws DataFormatException
+ {
+ final ServerState result = new ServerState();
+
+ final int maxPos = byteArray.length - 1 /* stupid legacy zero separator */;
+ while (bytes.position() < maxPos)
+ {
+ final int serverId = nextIntUTF8();
+ final CSN csn = nextCSNUTF8();
+ if (serverId != csn.getServerId())
+ {
+ throw new DataFormatException("Expected serverId=" + serverId
+ + " to be the same as serverId for CSN=" + csn);
+ }
+ result.update(csn);
+ }
+ skipZeroSeparator();
+ return result;
+ }
+
+ /**
+ * Skips the next byte and verifies it is effectively the zero separator.
+ *
+ * @throws DataFormatException
+ * if the next byte is not the zero separator.
+ */
+ public void skipZeroSeparator() throws DataFormatException
+ {
+ if (bytes.peek() != (byte) 0)
+ {
+ throw new DataFormatException("Expected a zero separator at position "
+ + bytes.position() + " but found byte " + bytes.peek());
+ }
+ bytes.skip(1);
+ }
+
+ /**
+ * Returns a new ASN1Reader that will read bytes from this ByteArrayScanner.
+ *
+ * @return a new ASN1Reader that will read bytes from this ByteArrayScanner.
+ */
+ public ASN1Reader getASN1Reader()
+ {
+ return ASN1.getReader(bytes);
+ }
+
+ /**
* Returns whether the scanner has more bytes to consume.
*
* @return true if the scanner has more bytes to consume, false otherwise.
@@ -278,4 +413,10 @@
return bytes.remaining() == 0;
}
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return bytes.toString();
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java b/opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java
index f0a3b5c..3e55c30 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java
@@ -22,11 +22,12 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
import java.util.zip.DataFormatException;
+
import org.opends.server.replication.common.ServerStatus;
/**
@@ -36,10 +37,10 @@
*/
public class ChangeStatusMsg extends ReplicationMsg
{
- // The status we want the DS to enter (used when from RS to DS)
- private ServerStatus requestedStatus = ServerStatus.INVALID_STATUS;
- // The new status the DS just entered (used when from DS to RS)
- private ServerStatus newStatus = ServerStatus.INVALID_STATUS;
+ /** The status we want the DS to enter (used when from RS to DS) */
+ private final ServerStatus requestedStatus;
+ /** The new status the DS just entered (used when from DS to RS) */
+ private ServerStatus newStatus;
/**
* Create a new ChangeStatusMsg.
@@ -61,25 +62,19 @@
* @throws DataFormatException If the byte array does not contain a valid
* encoded form of the ChangeStatusMsg.
*/
- public ChangeStatusMsg(byte[] encodedMsg) throws DataFormatException
+ ChangeStatusMsg(byte[] encodedMsg) throws DataFormatException
{
/*
* The message is stored in the form:
* <message type><requested status><new status>
*/
-
- /* First byte is the type */
- if (encodedMsg[0] != ReplicationMsg.MSG_TYPE_CHANGE_STATUS)
- {
- throw new DataFormatException("byte[] is not a valid msg");
- }
-
try
{
- /* Then the requested status */
+ if (encodedMsg[0] != ReplicationMsg.MSG_TYPE_CHANGE_STATUS)
+ {
+ throw new DataFormatException("byte[] is not a valid msg");
+ }
requestedStatus = ServerStatus.valueOf(encodedMsg[1]);
-
- /* Then the new status */
newStatus = ServerStatus.valueOf(encodedMsg[2]);
} catch (IllegalArgumentException e)
{
@@ -87,9 +82,7 @@
}
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
@@ -97,18 +90,12 @@
* The message is stored in the form:
* <message type><requested status><new status>
*/
- byte[] encodedMsg = new byte[3];
-
- /* Put the type of the operation */
- encodedMsg[0] = ReplicationMsg.MSG_TYPE_CHANGE_STATUS;
-
- /* Put the requested status */
- encodedMsg[1] = requestedStatus.getValue();
-
- /* Put the requested status */
- encodedMsg[2] = newStatus.getValue();
-
- return encodedMsg;
+ return new byte[]
+ {
+ ReplicationMsg.MSG_TYPE_CHANGE_STATUS,
+ requestedStatus.getValue(),
+ newStatus.getValue()
+ };
}
/**
@@ -129,9 +116,7 @@
return newStatus;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
diff --git a/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java b/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
index 098cd4e..1f766e8 100644
--- a/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -26,7 +26,6 @@
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.controls.SubtreeDeleteControl;
@@ -54,14 +53,14 @@
*
* @param operation the Operation from which the message must be created.
*/
- public DeleteMsg(PostOperationDeleteOperation operation)
+ DeleteMsg(PostOperationDeleteOperation operation)
{
super((OperationContext) operation.getAttachment(SYNCHROCONTEXT),
operation.getEntryDN());
try
{
- if (operation.getRequestControl(SubtreeDeleteControl.DECODER) != null)
- isSubtreeDelete = true;
+ isSubtreeDelete =
+ operation.getRequestControl(SubtreeDeleteControl.DECODER) != null;
}
catch(Exception e)
{/* do nothing */}
@@ -84,19 +83,16 @@
*
* @param in The byte[] from which the operation must be read.
* @throws DataFormatException The input byte[] is not a valid DeleteMsg
- * @throws UnsupportedEncodingException If UTF8 is not supported by the jvm
*/
- public DeleteMsg(byte[] in) throws DataFormatException,
- UnsupportedEncodingException
+ DeleteMsg(byte[] in) throws DataFormatException
{
- byte[] allowedPduTypes = new byte[2];
- allowedPduTypes[0] = MSG_TYPE_DELETE;
- allowedPduTypes[1] = MSG_TYPE_DELETE_V1;
- int pos = decodeHeader(allowedPduTypes, in);
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ decodeHeader(scanner, MSG_TYPE_DELETE, MSG_TYPE_DELETE_V1);
- // protocol version has been read as part of the header
if (protocolVersion >= 4)
- decodeBody_V4(in, pos);
+ {
+ decodeBody_V4(scanner);
+ }
else
{
// Keep the previous protocol version behavior - when we don't know the
@@ -115,7 +111,9 @@
InternalClientConnection.nextMessageID(), null, newDN);
if (isSubtreeDelete)
+ {
del.addRequestControl(new SubtreeDeleteControl(false));
+ }
DeleteContext ctx = new DeleteContext(getCSN(), getEntryUUID());
del.setAttachment(SYNCHROCONTEXT, ctx);
@@ -128,108 +126,47 @@
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V1() throws UnsupportedEncodingException
+ public byte[] getBytes_V1()
{
- return encodeHeader_V1(MSG_TYPE_DELETE_V1, 0);
+ return encodeHeader_V1(MSG_TYPE_DELETE_V1)
+ .toByteArray();
}
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V23() throws UnsupportedEncodingException
+ public byte[] getBytes_V23()
{
- return encodeHeader(MSG_TYPE_DELETE, 0,
- ProtocolVersion.REPLICATION_PROTOCOL_V3);
+ return encodeHeader(MSG_TYPE_DELETE,ProtocolVersion.REPLICATION_PROTOCOL_V3)
+ .toByteArray();
}
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V45(short reqProtocolVersion)
- throws UnsupportedEncodingException
+ public byte[] getBytes_V45(short protocolVersion)
{
- // Put together the different encoded pieces
- int bodyLength = 0;
-
- byte[] byteEntryAttrLen =
- String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
-
- bodyLength += byteEntryAttrLen.length + 1;
- bodyLength += encodedEclIncludes.length + 1;
- byte[] byteInitiatorsName = null;
- if (initiatorsName != null)
- {
- byteInitiatorsName = initiatorsName.getBytes("UTF-8");
- bodyLength += byteInitiatorsName.length + 1;
- }
- else
- {
- bodyLength++;
- }
- // subtree flag
- bodyLength++;
-
- /* encode the header in a byte[] large enough to also contain the mods */
- byte [] encodedMsg = encodeHeader(MSG_TYPE_DELETE, bodyLength,
- reqProtocolVersion);
- int pos = encodedMsg.length - bodyLength;
- if (byteInitiatorsName != null)
- pos = addByteArray(byteInitiatorsName, encodedMsg, pos);
- else
- encodedMsg[pos++] = 0;
- pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
- pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
-
- encodedMsg[pos++] = (byte) (isSubtreeDelete ? 1 : 0);
-
- return encodedMsg;
+ final ByteArrayBuilder builder =
+ encodeHeader(MSG_TYPE_DELETE, protocolVersion);
+ builder.append(initiatorsName);
+ builder.appendUTF8(encodedEclIncludes.length);
+ builder.appendZeroTerminated(encodedEclIncludes);
+ builder.append(isSubtreeDelete);
+ return builder.toByteArray();
}
// ============
// Msg decoding
// ============
- private void decodeBody_V4(byte[] in, int pos)
- throws DataFormatException, UnsupportedEncodingException
+ private void decodeBody_V4(ByteArrayScanner scanner)
+ throws DataFormatException
{
- int length = getNextLength(in, pos);
- if (length != 0)
- {
- initiatorsName = new String(in, pos, length, "UTF-8");
- pos += length + 1;
- }
- else
- {
- initiatorsName = null;
- pos += 1;
- }
+ initiatorsName = scanner.nextString();
- // Read ecl attr len
- length = getNextLength(in, pos);
- int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
- // Skip the length
- pos += length + 1;
+ final int eclAttrLen = scanner.nextIntUTF8();
+ encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
+ scanner.skipZeroSeparator();
- // Read/Don't decode entry attributes
- encodedEclIncludes = new byte[eclAttrLen];
- try
- {
- // Copy ecl attr
- System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
- // Skip the attrs
- pos += eclAttrLen +1;
- } catch (IndexOutOfBoundsException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (ArrayStoreException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (NullPointerException e)
- {
- throw new DataFormatException(e.getMessage());
- }
-
- // subtree flag
- isSubtreeDelete = (in[pos] == 1);
-
+ isSubtreeDelete = scanner.nextBoolean();
}
/** {@inheritDoc} */
diff --git a/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java b/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
index 06e37b2..e8d7b74 100644
--- a/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
@@ -22,11 +22,10 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -54,65 +53,26 @@
* @throws DataFormatException If the in does not contain a properly,
* encoded message.
*/
- public DoneMsg(byte[] in) throws DataFormatException
+ DoneMsg(byte[] in) throws DataFormatException
{
- super();
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_DONE)
{
- // First byte is the type
- if (in[0] != MSG_TYPE_DONE)
- throw new DataFormatException("input is not a valid DoneMessage");
- int pos = 1;
-
- // sender
- int length = getNextLength(in, pos);
- String senderString = new String(in, pos, length, "UTF-8");
- this.senderID = Integer.valueOf(senderString);
- pos += length +1;
-
- // destination
- length = getNextLength(in, pos);
- String destinationString = new String(in, pos, length, "UTF-8");
- this.destination = Integer.valueOf(destinationString);
- pos += length +1;
-
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ throw new DataFormatException("input is not a valid DoneMessage");
}
+ this.senderID = scanner.nextIntUTF8();
+ this.destination = scanner.nextIntUTF8();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
- try
- {
- byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
- byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
-
- int length = 1 + senderBytes.length + 1
- + destinationBytes.length + 1;
-
- byte[] resultByteArray = new byte[length];
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_DONE;
- int pos = 1;
-
- /* put the sender */
- pos = addByteArray(senderBytes, resultByteArray, pos);
-
- /* put the destination */
- pos = addByteArray(destinationBytes, resultByteArray, pos);
-
- return resultByteArray;
- }
- catch (UnsupportedEncodingException e)
- {
- return null;
- }
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_DONE);
+ builder.appendUTF8(senderID);
+ builder.appendUTF8(destination);
+ return builder.toByteArray();
}
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
index fc43a71..2173135 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
@@ -22,11 +22,10 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.MultiDomainServerState;
@@ -73,56 +72,33 @@
* @param in The byte array containing the encoded form of the message.
* @throws DataFormatException If the byte array does not contain
* a valid encoded form of the message.
- * @throws UnsupportedEncodingException when it occurs.
* @throws NotSupportedOldVersionPDUException when it occurs.
*/
- public ECLUpdateMsg(byte[] in)
- throws DataFormatException,
- UnsupportedEncodingException,
- NotSupportedOldVersionPDUException
+ ECLUpdateMsg(byte[] in) throws DataFormatException,
+ NotSupportedOldVersionPDUException
{
try
{
- if (in[0] != MSG_TYPE_ECL_UPDATE)
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ if (scanner.nextByte() != MSG_TYPE_ECL_UPDATE)
{
throw new DataFormatException("byte[] is not a valid " +
getClass().getCanonicalName());
}
- int pos = 1;
- // Decode the cookie
- int length = getNextLength(in, pos);
- String cookieStr = new String(in, pos, length, "UTF-8");
- this.cookie = new MultiDomainServerState(cookieStr);
- pos += length + 1;
-
- // Decode the baseDN
- length = getNextLength(in, pos);
- this.baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
-
- // Decode the changeNumber
- length = getNextLength(in, pos);
- this.changeNumber = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
+ this.cookie = new MultiDomainServerState(scanner.nextString());
+ this.baseDN = scanner.nextDN();
+ this.changeNumber = scanner.nextIntUTF8();
// Decode the msg
- /* Read the mods : all the remaining bytes but the terminating 0 */
- length = in.length - pos - 1;
- byte[] encodedMsg = new byte[length];
- System.arraycopy(in, pos, encodedMsg, 0, length);
- ReplicationMsg rmsg = ReplicationMsg.generateMsg(
- encodedMsg, ProtocolVersion.getCurrentVersion());
- this.updateMsg = (LDAPUpdateMsg)rmsg;
+ this.updateMsg = (LDAPUpdateMsg) ReplicationMsg.generateMsg(
+ scanner.remainingBytesZeroTerminated(),
+ ProtocolVersion.getCurrentVersion());
}
catch(DirectoryException de)
{
throw new DataFormatException(de.toString());
}
- catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
}
/**
@@ -162,9 +138,7 @@
return updateMsg;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
@@ -175,39 +149,19 @@
" serviceId: " + baseDN + "]";
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
- throws UnsupportedEncodingException
{
- byte[] byteCookie = String.valueOf(cookie).getBytes("UTF-8");
- byte[] byteBaseDN = String.valueOf(baseDN).getBytes("UTF-8");
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_ECL_UPDATE);
+ builder.append(String.valueOf(cookie));
+ builder.append(baseDN);
// FIXME JNR Changing the line below to use long would require a protocol
// version change. Leave it like this for now until the need arises.
- byte[] byteChangeNumber =
- Integer.toString((int) changeNumber).getBytes("UTF-8");
- byte[] byteUpdateMsg = updateMsg.getBytes(protocolVersion);
-
- int length = 1 + byteCookie.length +
- 1 + byteBaseDN.length +
- 1 + byteChangeNumber.length +
- 1 + byteUpdateMsg.length + 1;
-
- byte[] resultByteArray = new byte[length];
-
- /* Encode type */
- resultByteArray[0] = MSG_TYPE_ECL_UPDATE;
- int pos = 1;
-
- // Encode all fields
- pos = addByteArray(byteCookie, resultByteArray, pos);
- pos = addByteArray(byteBaseDN, resultByteArray, pos);
- pos = addByteArray(byteChangeNumber, resultByteArray, pos);
- pos = addByteArray(byteUpdateMsg, resultByteArray, pos);
-
- return resultByteArray;
+ builder.appendUTF8((int) changeNumber);
+ builder.appendZeroTerminated(updateMsg.getBytes(protocolVersion));
+ return builder.toByteArray();
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java b/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
index 7186d82..6ee39ac 100644
--- a/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
@@ -22,11 +22,10 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions Copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -37,51 +36,39 @@
*/
public class EntryMsg extends RoutableMsg
{
- // The byte array containing the bytes of the entry transported
- private byte[] entryByteArray;
+ /** The byte array containing the bytes of the entry transported. */
+ private final byte[] entryByteArray;
private int msgId = -1; // from V4
/**
* Creates a new EntryMsg.
*
- * @param sender The sender of this message.
+ * @param serverID The sender of this message.
* @param destination The destination of this message.
* @param entryBytes The bytes of the entry.
* @param msgId Message counter.
*/
- public EntryMsg(
- int sender,
- int destination,
- byte[] entryBytes,
- int msgId)
+ public EntryMsg(int serverID, int destination, byte[] entryBytes, int msgId)
{
- super(sender, destination);
- this.entryByteArray = new byte[entryBytes.length];
- System.arraycopy(entryBytes, 0, this.entryByteArray, 0, entryBytes.length);
- this.msgId = msgId;
+ this(serverID, destination, entryBytes, 0, entryBytes.length, msgId);
}
/**
* Creates a new EntryMsg.
*
* @param serverID The sender of this message.
- * @param i The destination of this message.
+ * @param destination The destination of this message.
* @param entryBytes The bytes of the entry.
- * @param pos The starting Position in the array.
+ * @param startPos The starting Position in the array.
* @param length Number of array elements to be copied.
* @param msgId Message counter.
*/
- public EntryMsg(
- int serverID,
- int i,
- byte[] entryBytes,
- int pos,
- int length,
- int msgId)
+ public EntryMsg(int serverID, int destination, byte[] entryBytes, int startPos,
+ int length, int msgId)
{
- super(serverID, i);
+ super(serverID, destination);
this.entryByteArray = new byte[length];
- System.arraycopy(entryBytes, pos, this.entryByteArray, 0, length);
+ System.arraycopy(entryBytes, startPos, this.entryByteArray, 0, length);
this.msgId = msgId;
}
@@ -93,47 +80,22 @@
* @throws DataFormatException If the byte array does not contain a valid
* encoded form of the ServerStartMessage.
*/
- public EntryMsg(byte[] in, short version) throws DataFormatException
+ EntryMsg(byte[] in, short version) throws DataFormatException
{
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_ENTRY)
{
- /* first byte is the type */
- if (in[0] != MSG_TYPE_ENTRY)
- throw new DataFormatException("input is not a valid " +
- this.getClass().getCanonicalName());
- int pos = 1;
-
- // sender
- int length = getNextLength(in, pos);
- String senderIDString = new String(in, pos, length, "UTF-8");
- this.senderID = Integer.valueOf(senderIDString);
- pos += length +1;
-
- // destination
- length = getNextLength(in, pos);
- String destinationString = new String(in, pos, length, "UTF-8");
- this.destination = Integer.valueOf(destinationString);
- pos += length +1;
-
- // msgCnt
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // msgCnt
- length = getNextLength(in, pos);
- String msgcntString = new String(in, pos, length, "UTF-8");
- this.msgId = Integer.valueOf(msgcntString);
- pos += length +1;
- }
-
- // data
- length = in.length - (pos + 1);
- this.entryByteArray = new byte[length];
- System.arraycopy(in, pos, entryByteArray, 0, length);
+ throw new DataFormatException("input is not a valid "
+ + getClass().getCanonicalName());
}
- catch (UnsupportedEncodingException e)
+ this.senderID = scanner.nextIntUTF8();
+ this.destination = scanner.nextIntUTF8();
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ this.msgId = scanner.nextIntUTF8();
}
+ this.entryByteArray = scanner.remainingBytesZeroTerminated();
}
/**
@@ -145,46 +107,20 @@
return entryByteArray;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short version)
{
- try {
- byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
- byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
- byte[] msgCntBytes = null;
- byte[] entryBytes = entryByteArray;
-
- int length = 1 + senderBytes.length +
- 1 + destinationBytes.length +
- 1 + entryBytes.length + 1;
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- msgCntBytes = String.valueOf(msgId).getBytes("UTF-8");
- length += (1 + msgCntBytes.length);
- }
-
- byte[] resultByteArray = new byte[length];
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_ENTRY;
- int pos = 1;
-
- pos = addByteArray(senderBytes, resultByteArray, pos);
- pos = addByteArray(destinationBytes, resultByteArray, pos);
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- pos = addByteArray(msgCntBytes, resultByteArray, pos);
- pos = addByteArray(entryBytes, resultByteArray, pos);
-
- return resultByteArray;
- }
- catch (UnsupportedEncodingException e)
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_ENTRY);
+ builder.appendUTF8(senderID);
+ builder.appendUTF8(destination);
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- return null;
+ builder.appendUTF8(msgId);
}
+ builder.appendZeroTerminated(entryByteArray);
+ return builder.toByteArray();
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java b/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
index 362be92..87b6ef2 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
@@ -22,20 +22,17 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import org.opends.messages.Message;
-
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
+import org.opends.messages.Message;
import org.opends.server.loggers.debug.DebugTracer;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+
/**
* This message is part of the replication protocol.
* This message is sent by a server or a replication server when an error
@@ -43,18 +40,21 @@
*/
public class ErrorMsg extends RoutableMsg
{
- // The tracer object for the debug logger
+ /** The tracer object for the debug logger */
private static final DebugTracer TRACER = getTracer();
- // Specifies the messageID built from the error that was detected
- private int msgID;
+ /** Specifies the messageID built from the error that was detected */
+ private final int msgID;
- // Specifies the complementary details about the error that was detected
- private Message details = null;
+ /** Specifies the complementary details about the error that was detected */
+ private final Message details;
- // The time of creation of this message.
- // protocol version previous to V4
- private Long creationTime = System.currentTimeMillis();
+ /**
+ * The time of creation of this message.
+ * <p>
+ * protocol version previous to V4
+ */
+ private long creationTime = System.currentTimeMillis();
/**
* Creates an ErrorMsg providing the destination server.
@@ -63,8 +63,7 @@
* @param destination The destination server or servers of this message.
* @param details The message containing the details of the error.
*/
- public ErrorMsg(int sender, int destination,
- Message details)
+ public ErrorMsg(int sender, int destination, Message details)
{
super(sender, destination);
this.msgID = details.getDescriptor().getId();
@@ -72,8 +71,10 @@
this.creationTime = System.currentTimeMillis();
if (debugEnabled())
- TRACER.debugInfo(" Creating error message" + this.toString()
- + " " + stackTraceToSingleLineString(new Exception("trace")));
+ {
+ TRACER.debugInfo(" Creating error message" + this + " "
+ + stackTraceToSingleLineString(new Exception("trace")));
+ }
}
/**
@@ -90,7 +91,9 @@
this.creationTime = System.currentTimeMillis();
if (debugEnabled())
- TRACER.debugInfo(this.toString());
+ {
+ TRACER.debugInfo(toString());
+ }
}
/**
@@ -101,53 +104,23 @@
* @throws DataFormatException If the in does not contain a properly
* encoded message.
*/
- public ErrorMsg(byte[] in, short version)
- throws DataFormatException
+ ErrorMsg(byte[] in, short version) throws DataFormatException
{
- super();
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_ERROR)
{
- /* first byte is the type */
- if (in[0] != MSG_TYPE_ERROR)
- throw new DataFormatException("input is not a valid " +
- this.getClass().getCanonicalName());
- int pos = 1;
-
- // sender
- int length = getNextLength(in, pos);
- String senderString = new String(in, pos, length, "UTF-8");
- senderID = Integer.valueOf(senderString);
- pos += length +1;
-
- // destination
- length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- destination = Integer.valueOf(serverIdString);
- pos += length +1;
-
- // MsgID
- length = getNextLength(in, pos);
- String msgIdString = new String(in, pos, length, "UTF-8");
- msgID = Integer.valueOf(msgIdString);
- pos += length +1;
-
- // Details
- length = getNextLength(in, pos);
- details = Message.raw(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // Creation Time
- length = getNextLength(in, pos);
- String creationTimeString = new String(in, pos, length, "UTF-8");
- creationTime = Long.valueOf(creationTimeString);
- pos += length +1;
- }
+ throw new DataFormatException("input is not a valid "
+ + getClass().getCanonicalName());
}
- catch (UnsupportedEncodingException e)
+ senderID = scanner.nextIntUTF8();
+ destination = scanner.nextIntUTF8();
+ msgID = scanner.nextIntUTF8();
+ details = Message.raw(scanner.nextString());
+
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ creationTime = scanner.nextLongUTF8();
}
}
@@ -175,60 +148,21 @@
// Msg encoding
// ============
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short version)
{
- try {
- byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
- byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
- byte[] byteErrMsgId = String.valueOf(msgID).getBytes("UTF-8");
- byte[] byteDetails = details.toString().getBytes("UTF-8");
- byte[] byteCreationTime = null;
-
- int length = 1 + byteSender.length + 1
- + byteDestination.length + 1
- + byteErrMsgId.length + 1
- + byteDetails.length + 1;
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- byteCreationTime = creationTime.toString().getBytes("UTF-8");
- length += byteCreationTime.length + 1;
- }
-
- byte[] resultByteArray = new byte[length];
-
- // put the type of the operation
- resultByteArray[0] = MSG_TYPE_ERROR;
- int pos = 1;
-
- // sender
- pos = addByteArray(byteSender, resultByteArray, pos);
-
- // destination
- pos = addByteArray(byteDestination, resultByteArray, pos);
-
- // MsgId
- pos = addByteArray(byteErrMsgId, resultByteArray, pos);
-
- // details
- pos = addByteArray(byteDetails, resultByteArray, pos);
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // creation time
- pos = addByteArray(byteCreationTime, resultByteArray, pos);
- }
-
- return resultByteArray;
- }
- catch (UnsupportedEncodingException e)
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_ERROR);
+ builder.appendUTF8(senderID);
+ builder.appendUTF8(destination);
+ builder.appendUTF8(msgID);
+ builder.append(details.toString());
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- return null;
+ builder.appendUTF8(creationTime);
}
+ return builder.toByteArray();
}
/**
@@ -236,6 +170,7 @@
*
* @return the string representation of this message.
*/
+ @Override
public String toString()
{
return "ErrorMessage=["+
@@ -254,7 +189,7 @@
*
* @return the creation time of this message.
*/
- public Long getCreationTime()
+ public long getCreationTime()
{
return creationTime;
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java b/opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java
index e2d7d4d..a117e90 100644
--- a/opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java
@@ -22,9 +22,8 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
-
package org.opends.server.replication.protocol;
import java.util.zip.DataFormatException;
@@ -52,20 +51,18 @@
* @throws java.util.zip.DataFormatException If the byte array does not
* contain a valid encoded form of the message.
*/
- public HeartbeatMsg(byte[] in) throws DataFormatException
+ HeartbeatMsg(byte[] in) throws DataFormatException
{
/* The heartbeat message is encoded in the form :
* <msg-type>
*/
-
- /* first byte is the type */
if (in.length != 1 || in[0] != MSG_TYPE_HEARTBEAT)
+ {
throw new DataFormatException("Input is not a valid Heartbeat Message.");
+ }
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
@@ -73,13 +70,7 @@
* The heartbeat message contains:
* <msg-type>
*/
- int length = 1;
- byte[] resultByteArray = new byte[length];
-
- /* put the message type */
- resultByteArray[0] = MSG_TYPE_HEARTBEAT;
-
- return resultByteArray;
+ return new byte[] { MSG_TYPE_HEARTBEAT };
}
/** {@inheritDoc} */
diff --git a/opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java b/opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java
index a06be34..c89e81e 100644
--- a/opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java
@@ -22,14 +22,12 @@
*
*
* Copyright 2010 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
-
/**
* This message is used by LDAP server or by Replication Servers to
* update the send window of the remote entities.
@@ -43,7 +41,6 @@
{
private final int numAck;
-
/**
* Create a new message..
*
@@ -65,84 +62,37 @@
* @throws DataFormatException If the byte array does not contain a valid
* encoded form of the message.
*/
- public InitializeRcvAckMsg(byte[] in) throws DataFormatException
+ InitializeRcvAckMsg(byte[] in) throws DataFormatException
{
- super();
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ if (scanner.nextByte() != MSG_TYPE_INITIALIZE_RCV_ACK)
{
- // msg type
- if (in[0] != MSG_TYPE_INITIALIZE_RCV_ACK)
- throw new DataFormatException("input is not a valid "
- + this.getClass().getCanonicalName());
- int pos = 1;
-
- // sender
- int length = getNextLength(in, pos);
- String senderString = new String(in, pos, length, "UTF-8");
- senderID = Integer.valueOf(senderString);
- pos += length +1;
-
- // destination
- length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- destination = Integer.valueOf(serverIdString);
- pos += length +1;
-
- // value fo the ack
- length = getNextLength(in, pos);
- String numAckStr = new String(in, pos, length, "UTF-8");
- pos += length +1;
- numAck = Integer.parseInt(numAckStr);
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ throw new DataFormatException("input is not a valid "
+ + getClass().getCanonicalName());
}
+
+ senderID = scanner.nextIntUTF8();
+ destination = scanner.nextIntUTF8();
+ numAck = scanner.nextIntUTF8();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
- try {
- byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
- byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
- byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8");
-
- int length = 1 + byteSender.length + 1
- + byteDestination.length + 1
- + byteNumAck.length + 1;
-
- byte[] resultByteArray = new byte[length];
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_INITIALIZE_RCV_ACK;
- int pos = 1;
-
- // sender
- pos = addByteArray(byteSender, resultByteArray, pos);
-
- // destination
- pos = addByteArray(byteDestination, resultByteArray, pos);
-
- // ack value
- pos = addByteArray(byteNumAck, resultByteArray, pos);
-
- return resultByteArray;
- }
- catch (UnsupportedEncodingException e)
- {
- return null;
- }
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_INITIALIZE_RCV_ACK);
+ builder.appendUTF8(senderID);
+ builder.appendUTF8(destination);
+ builder.appendUTF8(numAck);
+ return builder.toByteArray();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
+ @Override
public String toString()
{
- return this.getClass().getSimpleName() + "=["+
+ return getClass().getSimpleName() + "=[" +
" sender=" + this.senderID +
" destination=" + this.destination +
" msgID=" + this.numAck + "]";
diff --git a/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java b/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
index 9269105..6a1a695 100644
--- a/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
@@ -22,15 +22,13 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
/**
* This message is part of the replication protocol.
@@ -40,7 +38,7 @@
*/
public class InitializeRequestMsg extends RoutableMsg
{
- private DN baseDN;
+ private final DN baseDN;
private int initWindow = 0;
/**
@@ -66,51 +64,22 @@
* @throws DataFormatException If the in does not contain a properly
* encoded InitializeMessage.
*/
- public InitializeRequestMsg(byte[] in, short version)
- throws DataFormatException
+ InitializeRequestMsg(byte[] in, short version) throws DataFormatException
{
- super();
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_INITIALIZE_REQUEST)
{
- /* first byte is the type */
- if (in[0] != MSG_TYPE_INITIALIZE_REQUEST)
- throw new DataFormatException(
- "input is not a valid InitializeRequestMessage");
- int pos = 1;
-
- // baseDN
- int length = getNextLength(in, pos);
- baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- // sender
- length = getNextLength(in, pos);
- String sourceServerIdString = new String(in, pos, length, "UTF-8");
- senderID = Integer.valueOf(sourceServerIdString);
- pos += length +1;
-
- // destination
- length = getNextLength(in, pos);
- String destinationServerIdString = new String(in, pos, length, "UTF-8");
- destination = Integer.valueOf(destinationServerIdString);
- pos += length +1;
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // init window
- length = getNextLength(in, pos);
- String initWindowString = new String(in, pos, length, "UTF-8");
- initWindow = Integer.valueOf(initWindowString);
- pos += length +1;
- }
+ throw new DataFormatException(
+ "input is not a valid InitializeRequestMessage");
}
- catch (UnsupportedEncodingException e)
+ baseDN = scanner.nextDN();
+ senderID = scanner.nextIntUTF8();
+ destination = scanner.nextIntUTF8();
+
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- catch (DirectoryException e)
- {
- throw new DataFormatException(e.getLocalizedMessage());
+ initWindow = scanner.nextIntUTF8();
}
}
@@ -128,54 +97,20 @@
// Msg encoding
// ============
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short version)
{
- try {
- byte[] baseDNBytes = baseDN.toNormalizedString().getBytes("UTF-8");
- byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
- byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
- byte[] initWindowBytes = null;
-
- int length = 1 + baseDNBytes.length + 1 + senderBytes.length + 1
- + destinationBytes.length + 1;
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- initWindowBytes = String.valueOf(initWindow).getBytes("UTF-8");
- length += initWindowBytes.length + 1;
- }
-
- byte[] resultByteArray = new byte[length];
-
- // type of the operation
- resultByteArray[0] = MSG_TYPE_INITIALIZE_REQUEST;
- int pos = 1;
-
- // baseDN
- pos = addByteArray(baseDNBytes, resultByteArray, pos);
-
- // sender
- pos = addByteArray(senderBytes, resultByteArray, pos);
-
- // destination
- pos = addByteArray(destinationBytes, resultByteArray, pos);
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // init window
- pos = addByteArray(initWindowBytes, resultByteArray, pos);
- }
-
- return resultByteArray;
- }
- catch (UnsupportedEncodingException e)
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_INITIALIZE_REQUEST);
+ builder.append(baseDN);
+ builder.appendUTF8(senderID);
+ builder.appendUTF8(destination);
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- return null;
+ builder.appendUTF8(initWindow);
}
+ return builder.toByteArray();
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java b/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
index 22003b6..f528429 100644
--- a/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
@@ -22,15 +22,13 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
/**
* This message is part of the replication protocol.
@@ -39,17 +37,17 @@
*/
public class InitializeTargetMsg extends RoutableMsg
{
- private DN baseDN;
+ private final DN baseDN;
/** Specifies the number of entries expected to be exported. */
- private long entryCount;
+ private final long entryCount;
/**
* Specifies the serverID of the server that requested this export to happen.
* It allows a server that previously sent an InitializeRequestMessage to know
* that the current message is related to its own request.
*/
- private int requestorID;
+ private final int requestorID;
private int initWindow;
@@ -80,63 +78,24 @@
* @throws DataFormatException If the in does not contain a properly
* encoded InitializeMessage.
*/
- public InitializeTargetMsg(byte[] in, short version)
- throws DataFormatException
+ InitializeTargetMsg(byte[] in, short version) throws DataFormatException
{
- super();
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_INITIALIZE_TARGET)
{
- /* first byte is the type */
- if (in[0] != MSG_TYPE_INITIALIZE_TARGET)
- throw new DataFormatException(
- "input is not a valid InitializeDestinationMessage");
- int pos = 1;
-
- // destination
- int length = getNextLength(in, pos);
- String destinationString = new String(in, pos, length, "UTF-8");
- this.destination = Integer.valueOf(destinationString);
- pos += length +1;
-
- // baseDN
- length = getNextLength(in, pos);
- baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- // sender
- length = getNextLength(in, pos);
- String senderString = new String(in, pos, length, "UTF-8");
- senderID = Integer.valueOf(senderString);
- pos += length +1;
-
- // requestor
- length = getNextLength(in, pos);
- String requestorString = new String(in, pos, length, "UTF-8");
- requestorID = Integer.valueOf(requestorString);
- pos += length +1;
-
- // entryCount
- length = getNextLength(in, pos);
- String entryCountString = new String(in, pos, length, "UTF-8");
- entryCount = Long.valueOf(entryCountString);
- pos += length +1;
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // init window
- length = getNextLength(in, pos);
- String initWindowString = new String(in, pos, length, "UTF-8");
- initWindow = Integer.valueOf(initWindowString);
- pos += length +1;
- }
+ throw new DataFormatException(
+ "input is not a valid InitializeDestinationMessage");
}
- catch (UnsupportedEncodingException e)
+ destination = scanner.nextIntUTF8();
+ baseDN = scanner.nextDN();
+ senderID = scanner.nextIntUTF8();
+ requestorID = scanner.nextIntUTF8();
+ entryCount = scanner.nextLongUTF8();
+
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- catch (DirectoryException e)
- {
- throw new DataFormatException(e.getLocalizedMessage());
+ initWindow = scanner.nextIntUTF8();
}
}
@@ -185,66 +144,22 @@
// Msg encoding
// ============
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short version)
- throws UnsupportedEncodingException
{
- try
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_INITIALIZE_TARGET);
+ builder.appendUTF8(destination);
+ builder.append(baseDN);
+ builder.appendUTF8(senderID);
+ builder.appendUTF8(requestorID);
+ builder.appendUTF8(entryCount);
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
- byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
- byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
- byte[] byteRequestor = String.valueOf(requestorID).getBytes("UTF-8");
- byte[] byteEntryCount = String.valueOf(entryCount).getBytes("UTF-8");
- byte[] byteInitWindow = null;
- int length = 1 + byteDestination.length + 1
- + byteDn.length + 1
- + byteSender.length + 1
- + byteRequestor.length + 1
- + byteEntryCount.length + 1;
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- byteInitWindow = String.valueOf(initWindow).getBytes("UTF-8");
- length += byteInitWindow.length + 1;
- }
-
- byte[] resultByteArray = new byte[length];
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_INITIALIZE_TARGET;
- int pos = 1;
-
- /* put the destination */
- pos = addByteArray(byteDestination, resultByteArray, pos);
-
- /* put the baseDN and a terminating 0 */
- pos = addByteArray(byteDn, resultByteArray, pos);
-
- /* put the sender */
- pos = addByteArray(byteSender, resultByteArray, pos);
-
- /* put the requestorID */
- pos = addByteArray(byteRequestor, resultByteArray, pos);
-
- /* put the entryCount */
- pos = addByteArray(byteEntryCount, resultByteArray, pos);
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- /* put the initWindow */
- pos = addByteArray(byteInitWindow, resultByteArray, pos);
- }
-
- return resultByteArray;
+ builder.appendUTF8(initWindow);
}
- catch (UnsupportedEncodingException e)
- {
- return null;
- }
+ return builder.toByteArray();
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
index 2efa4b1..a47d015 100644
--- a/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -22,11 +22,10 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.zip.DataFormatException;
@@ -83,7 +82,7 @@
* @param dn The DN of the entry on which the change
* that caused the creation of this object happened
*/
- public LDAPUpdateMsg(OperationContext ctx, DN dn)
+ LDAPUpdateMsg(OperationContext ctx, DN dn)
{
this.protocolVersion = ProtocolVersion.getCurrentVersion();
this.csn = ctx.getCSN();
@@ -101,7 +100,7 @@
* @param dn The DN of the entry on which the change
* that caused the creation of this object happened
*/
- public LDAPUpdateMsg(CSN csn, String entryUUID, DN dn)
+ LDAPUpdateMsg(CSN csn, String entryUUID, DN dn)
{
this.protocolVersion = ProtocolVersion.getCurrentVersion();
this.csn = csn;
@@ -117,27 +116,19 @@
*/
public static LDAPUpdateMsg generateMsg(PostOperationOperation op)
{
- LDAPUpdateMsg msg = null;
switch (op.getOperationType())
{
case MODIFY :
- msg = new ModifyMsg((PostOperationModifyOperation) op);
- break;
-
+ return new ModifyMsg((PostOperationModifyOperation) op);
case ADD:
- msg = new AddMsg((PostOperationAddOperation) op);
- break;
-
+ return new AddMsg((PostOperationAddOperation) op);
case DELETE :
- msg = new DeleteMsg((PostOperationDeleteOperation) op);
- break;
-
+ return new DeleteMsg((PostOperationDeleteOperation) op);
case MODIFY_DN :
- msg = new ModifyDNMsg( (PostOperationModifyDNOperation) op);
- break;
+ return new ModifyDNMsg( (PostOperationModifyDNOperation) op);
+ default:
+ return null;
}
-
- return msg;
}
/**
@@ -210,138 +201,62 @@
* of a synchronized portion of code.
*
* This method is not synchronized and therefore not MT safe.
- *
- * @throws UnsupportedEncodingException when encoding fails.
*/
- public void encode() throws UnsupportedEncodingException
+ public void encode()
{
bytes = getBytes(ProtocolVersion.getCurrentVersion());
}
- /**
- * Encode the common header for all the UpdateMsg. This uses the current
- * protocol version.
- *
- * @param type the type of UpdateMsg to encode.
- * @param additionalLength additional length needed to encode the remaining
- * part of the UpdateMsg.
- * @param version The ProtocolVersion to use when encoding.
- * @return a byte array containing the common header and enough space to
- * encode the remaining bytes of the UpdateMsg as was specified
- * by the additionalLength.
- * (byte array length = common header length + additionalLength)
- * @throws UnsupportedEncodingException if UTF-8 is not supported.
- */
+ /** {@inheritDoc} */
@Override
- public byte[] encodeHeader(byte type, int additionalLength, short version)
- throws UnsupportedEncodingException
+ public ByteArrayBuilder encodeHeader(byte msgType, short protocolVersion)
{
- byte[] byteDn = dn.toString().getBytes("UTF-8");
- byte[] csnByte = getCSN().toString().getBytes("UTF-8");
- byte[] byteEntryuuid = getEntryUUID().getBytes("UTF-8");
-
/* The message header is stored in the form :
* <operation type><protocol version><CSN><dn><entryuuid><assured>
* <assured mode> <safe data level>
- * the length of result byte array is therefore :
- * 1 + 1 + CSN length + 1 + dn length + 1 + uuid length + 1 + 1
- * + 1 + 1 + additional_length
*/
- int length = 8 + csnByte.length + byteDn.length
- + byteEntryuuid.length + additionalLength;
-
- byte[] encodedMsg = new byte[length];
-
- // put the type of the operation
- encodedMsg[0] = type;
-
- // put the protocol version
- encodedMsg[1] = (byte) version;
- int pos = 2;
-
- // Put the CSN
- pos = addByteArray(csnByte, encodedMsg, pos);
-
- // Put the DN and a terminating 0
- pos = addByteArray(byteDn, encodedMsg, pos);
-
- // Put the entry uuid and a terminating 0
- pos = addByteArray(byteEntryuuid, encodedMsg, pos);
-
- // Put the assured flag
- encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
-
- // Put the assured mode
- encodedMsg[pos++] = assuredMode.getValue();
-
- // Put the safe data level
- encodedMsg[pos++] = safeDataLevel;
-
- return encodedMsg;
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(msgType);
+ builder.append((byte) protocolVersion);
+ builder.appendUTF8(csn);
+ builder.append(dn);
+ builder.append(entryUUID);
+ builder.append(assuredFlag);
+ builder.append(assuredMode.getValue());
+ builder.append(safeDataLevel);
+ return builder;
}
/**
* Encode the common header for all the UpdateMessage. This uses the version
* 1 of the replication protocol (used for compatibility purpose).
*
- * @param type the type of UpdateMessage to encode.
- * @param additionalLength additional length needed to encode the remaining
- * part of the UpdateMessage.
- * @return a byte array containing the common header and enough space to
- * encode the remaining bytes of the UpdateMessage as was specified
- * by the additionalLength.
- * (byte array length = common header length + additionalLength)
- * @throws UnsupportedEncodingException if UTF-8 is not supported.
+ * @param msgType the type of UpdateMessage to encode.
+ * @return a byte array builder containing the common header
*/
- public byte[] encodeHeader_V1(byte type, int additionalLength)
- throws UnsupportedEncodingException
+ ByteArrayBuilder encodeHeader_V1(byte msgType)
{
- byte[] byteDn = dn.toString().getBytes("UTF-8");
- byte[] csnByte = getCSN().toString().getBytes("UTF-8");
- byte[] byteEntryuuid = getEntryUUID().getBytes("UTF-8");
-
/* The message header is stored in the form :
* <operation type><CSN><dn><assured><entryuuid><change>
- * the length of result byte array is therefore :
- * 1 + CSN length + 1 + dn length + 1 + 1 +
- * uuid length + 1 + additional_length
*/
- int length = 5 + csnByte.length + byteDn.length
- + byteEntryuuid.length + additionalLength;
-
- byte[] encodedMsg = new byte[length];
-
- // put the type of the operation
- encodedMsg[0] = type;
- int pos = 1;
-
- // put the CSN
- pos = addByteArray(csnByte, encodedMsg, pos);
-
- // put the assured information
- encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
-
- // put the DN and a terminating 0
- pos = addByteArray(byteDn, encodedMsg, pos);
-
- // put the entry uuid and a terminating 0
- pos = addByteArray(byteEntryuuid, encodedMsg, pos);
-
- return encodedMsg;
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(msgType);
+ builder.appendUTF8(csn);
+ builder.append(assuredFlag);
+ builder.append(dn);
+ builder.append(entryUUID);
+ return builder;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
- public byte[] getBytes(short reqProtocolVersion)
- throws UnsupportedEncodingException
+ public byte[] getBytes(short protocolVersion)
{
- if (reqProtocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
return getBytes_V1();
}
- else if (reqProtocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
+ else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
{
return getBytes_V23();
}
@@ -351,7 +266,7 @@
if (bytes == null)
{
// this is the current version of the protocol
- bytes = getBytes_V45(reqProtocolVersion);
+ bytes = getBytes_V45(protocolVersion);
}
return bytes;
}
@@ -362,45 +277,35 @@
* 1 of the replication protocol (used for compatibility purpose).
*
* @return The byte array representation of this Message.
- *
- * @throws UnsupportedEncodingException When the encoding of the message
- * failed because the UTF-8 encoding is not supported.
*/
- public abstract byte[] getBytes_V1() throws UnsupportedEncodingException;
+ protected abstract byte[] getBytes_V1();
/**
* Get the byte array representation of this Message. This uses the version
* 2 of the replication protocol (used for compatibility purpose).
*
* @return The byte array representation of this Message.
- *
- * @throws UnsupportedEncodingException When the encoding of the message
- * failed because the UTF-8 encoding is not supported.
*/
- public abstract byte[] getBytes_V23() throws UnsupportedEncodingException;
-
+ protected abstract byte[] getBytes_V23();
/**
* Get the byte array representation of this Message. This uses the provided
* version number which must be version 4 or newer.
- * @param reqProtocolVersion TODO
*
+ * @param protocolVersion the actual protocol version to encode into
* @return The byte array representation of this Message.
- *
- * @throws UnsupportedEncodingException When the encoding of the message
- * failed because the UTF-8 encoding is not supported.
*/
- public abstract byte[] getBytes_V45(short reqProtocolVersion)
- throws UnsupportedEncodingException;
-
+ protected abstract byte[] getBytes_V45(short protocolVersion);
/**
* Encode a list of attributes.
*/
- static private byte[] encodeAttributes(Collection<Attribute> attributes)
+ private static byte[] encodeAttributes(Collection<Attribute> attributes)
{
if (attributes==null)
+ {
return new byte[0];
+ }
try
{
ByteStringBuilder byteBuilder = new ByteStringBuilder();
@@ -424,151 +329,62 @@
/**
* Decode the Header part of this Update Message, and check its type.
*
- * @param types The allowed types of this Update Message.
- * @param encodedMsg the encoded form of the UpdateMsg.
- * @return the position at which the remaining part of the message starts.
+ * @param scanner the encoded form of the UpdateMsg.
+ * @param allowedTypes The allowed types of this Update Message.
* @throws DataFormatException if the encodedMsg does not contain a valid
* common header.
*/
- public int decodeHeader(byte[] types, byte[] encodedMsg)
- throws DataFormatException
- {
- // first byte is the type
- boolean foundMatchingType = false;
- for (byte type : types)
- {
- if (type == encodedMsg[0])
- {
- foundMatchingType = true;
- break;
- }
- }
- if (!foundMatchingType)
- throw new DataFormatException("byte[] is not a valid update msg: "
- + encodedMsg[0]);
-
- /*
- * For older protocol version PDUs, decode the matching version header
- * instead.
- */
- if ((encodedMsg[0] == MSG_TYPE_ADD_V1) ||
- (encodedMsg[0] == MSG_TYPE_DELETE_V1) ||
- (encodedMsg[0] == MSG_TYPE_MODIFYDN_V1) ||
- (encodedMsg[0] == MSG_TYPE_MODIFY_V1))
- {
- return decodeHeader_V1(encodedMsg);
- }
-
- // read the protocol version
- protocolVersion = encodedMsg[1];
-
- try
- {
- // Read the CSN
- int pos = 2;
- int length = getNextLength(encodedMsg, pos);
- String csnStr = new String(encodedMsg, pos, length, "UTF-8");
- pos += length + 1;
- csn = new CSN(csnStr);
-
- // Read the dn
- length = getNextLength(encodedMsg, pos);
- dn = DN.decode(new String(encodedMsg, pos, length, "UTF-8"));
- pos += length + 1;
-
- // Read the entryuuid
- length = getNextLength(encodedMsg, pos);
- entryUUID = new String(encodedMsg, pos, length, "UTF-8");
- pos += length + 1;
-
- // Read the assured information
- assuredFlag = encodedMsg[pos++] == 1;
-
- // Read the assured mode
- assuredMode = AssuredMode.valueOf(encodedMsg[pos++]);
-
- // Read the safe data level
- safeDataLevel = encodedMsg[pos++];
-
- return pos;
- }
- catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- catch (IllegalArgumentException e)
- {
- throw new DataFormatException(e.getLocalizedMessage());
- }
- catch (DirectoryException e)
- {
- throw new DataFormatException(e.getLocalizedMessage());
- }
- }
-
- /**
- * Decode the Header part of this Update Message, and check its type. This
- * uses the version 1 of the replication protocol (used for compatibility
- * purpose).
- *
- * @param encodedMsg the encoded form of the UpdateMessage.
- * @return the position at which the remaining part of the message starts.
- * @throws DataFormatException if the encodedMsg does not contain a valid
- * common header.
- */
- public int decodeHeader_V1(byte[] encodedMsg)
- throws DataFormatException
+ void decodeHeader(ByteArrayScanner scanner, byte... allowedTypes)
+ throws DataFormatException
{
- if ((encodedMsg[0] != MSG_TYPE_ADD_V1) &&
- (encodedMsg[0] != MSG_TYPE_DELETE_V1) &&
- (encodedMsg[0] != MSG_TYPE_MODIFYDN_V1) &&
- (encodedMsg[0] != MSG_TYPE_MODIFY_V1))
- throw new DataFormatException("byte[] is not a valid update msg: expected"
- + " a V1 PDU, received: " + encodedMsg[0]);
-
- // Force version to V1 (other new parameters take their default values
- // (assured stuff...))
- protocolVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
-
- try
+ final byte msgType = scanner.nextByte();
+ if (!isTypeAllowed(msgType, allowedTypes))
{
- // read the CSN
- int pos = 1;
- int length = getNextLength(encodedMsg, pos);
- String csnStr = new String(encodedMsg, pos, length, "UTF-8");
- pos += length + 1;
- csn = new CSN(csnStr);
-
- // read the assured information
- assuredFlag = encodedMsg[pos++] == 1;
-
- // read the dn
- length = getNextLength(encodedMsg, pos);
- dn = DN.decode(new String(encodedMsg, pos, length, "UTF-8"));
- pos += length + 1;
-
- // read the entryuuid
- length = getNextLength(encodedMsg, pos);
- entryUUID = new String(encodedMsg, pos, length, "UTF-8");
- pos += length + 1;
-
- return pos;
+ throw new DataFormatException("byte[] is not a valid update msg: "
+ + msgType);
}
- catch (UnsupportedEncodingException e)
+
+ if (msgType == MSG_TYPE_ADD_V1
+ || msgType == MSG_TYPE_DELETE_V1
+ || msgType == MSG_TYPE_MODIFYDN_V1
+ || msgType == MSG_TYPE_MODIFY_V1)
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ /*
+ * For older protocol versions, decode the matching version header instead
+ */
+ // Force version to V1 (other new parameters take their default values
+ // (assured stuff...))
+ protocolVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
+ csn = scanner.nextCSNUTF8();
+ assuredFlag = scanner.nextBoolean();
+ dn = scanner.nextDN();
+ entryUUID = scanner.nextString();
}
- catch (DirectoryException e)
+ else
{
- throw new DataFormatException(e.getLocalizedMessage());
+ protocolVersion = scanner.nextByte();
+ csn = scanner.nextCSNUTF8();
+ dn = scanner.nextDN();
+ entryUUID = scanner.nextString();
+ assuredFlag = scanner.nextBoolean();
+ assuredMode = AssuredMode.valueOf(scanner.nextByte());
+ safeDataLevel = scanner.nextByte();
}
}
- /**
- * Return the number of bytes used by this message.
- *
- * @return The number of bytes used by this message.
- */
+ private boolean isTypeAllowed(final byte msgType, byte... allowedTypes)
+ {
+ for (byte allowedType : allowedTypes)
+ {
+ if (msgType == allowedType)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override
public abstract int size();
@@ -613,7 +429,7 @@
* @throws LDAPException when it occurs.
* @throws ASN1Exception when it occurs.
*/
- public ArrayList<RawAttribute> decodeRawAttributes(byte[] in)
+ ArrayList<RawAttribute> decodeRawAttributes(byte[] in)
throws LDAPException, ASN1Exception
{
ArrayList<RawAttribute> rattr = new ArrayList<RawAttribute>();
@@ -642,7 +458,7 @@
* @throws LDAPException when it occurs.
* @throws ASN1Exception when it occurs.
*/
- public ArrayList<Attribute> decodeAttributes(byte[] in)
+ ArrayList<Attribute> decodeAttributes(byte[] in)
throws LDAPException, ASN1Exception
{
ArrayList<Attribute> lattr = new ArrayList<Attribute>();
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
index 38e3bba..d8da663 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -26,7 +26,6 @@
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.zip.DataFormatException;
@@ -67,10 +66,8 @@
newSuperiorEntryUUID = ctx.getNewSuperiorEntryUUID();
deleteOldRdn = operation.deleteOldRDN();
- if (operation.getRawNewSuperior() != null)
- newSuperior = operation.getRawNewSuperior().toString();
- else
- newSuperior = null;
+ final ByteString rawNewSuperior = operation.getRawNewSuperior();
+ newSuperior = rawNewSuperior != null ? rawNewSuperior.toString() : null;
newRDN = operation.getRawNewRDN().toString();
}
@@ -129,23 +126,19 @@
*
* @param in The byte[] from which the operation must be read.
* @throws DataFormatException The input byte[] is not a valid ModifyDNMsg.
- * @throws UnsupportedEncodingException If UTF8 is not supported.
*/
- public ModifyDNMsg(byte[] in) throws DataFormatException,
- UnsupportedEncodingException
+ ModifyDNMsg(byte[] in) throws DataFormatException
{
- // Decode header
- byte[] allowedPduTypes = new byte[2];
- allowedPduTypes[0] = MSG_TYPE_MODIFYDN;
- allowedPduTypes[1] = MSG_TYPE_MODIFYDN_V1;
- int pos = decodeHeader(allowedPduTypes, in);
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ decodeHeader(scanner, MSG_TYPE_MODIFYDN, MSG_TYPE_MODIFYDN_V1);
- // protocol version has been read as part of the header
if (protocolVersion <= 3)
- decodeBody_V123(in, pos);
+ {
+ decodeBody_V123(scanner, in[0]);
+ }
else
{
- decodeBody_V4(in, pos);
+ decodeBody_V4(scanner);
}
if (protocolVersion==ProtocolVersion.getCurrentVersion())
@@ -184,349 +177,81 @@
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V1() throws UnsupportedEncodingException
+ public byte[] getBytes_V1()
{
- byte[] byteNewRdn = newRDN.getBytes("UTF-8");
- byte[] byteNewSuperior = null;
- byte[] byteNewSuperiorId = null;
-
- // calculate the length necessary to encode the parameters
- int bodyLength = byteNewRdn.length + 1 + 1;
- if (newSuperior != null)
- {
- byteNewSuperior = newSuperior.getBytes("UTF-8");
- bodyLength += byteNewSuperior.length + 1;
- }
- else
- bodyLength += 1;
-
- if (newSuperiorEntryUUID != null)
- {
- byteNewSuperiorId = newSuperiorEntryUUID.getBytes("UTF-8");
- bodyLength += byteNewSuperiorId.length + 1;
- }
- else
- bodyLength += 1;
-
- byte[] encodedMsg = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1, bodyLength);
- int pos = encodedMsg.length - bodyLength;
-
- /* put the new RDN and a terminating 0 */
- pos = addByteArray(byteNewRdn, encodedMsg, pos);
-
- /* put the newsuperior and a terminating 0 */
- if (newSuperior != null)
- {
- pos = addByteArray(byteNewSuperior, encodedMsg, pos);
- }
- else
- encodedMsg[pos++] = 0;
-
- /* put the newsuperiorId and a terminating 0 */
- if (newSuperiorEntryUUID != null)
- {
- pos = addByteArray(byteNewSuperiorId, encodedMsg, pos);
- }
- else
- encodedMsg[pos++] = 0;
-
- /* put the deleteoldrdn flag */
- if (deleteOldRdn)
- encodedMsg[pos++] = 1;
- else
- encodedMsg[pos++] = 0;
-
- return encodedMsg;
+ final ByteArrayBuilder builder = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1);
+ builder.append(newRDN);
+ builder.append(newSuperior);
+ builder.append(newSuperiorEntryUUID);
+ builder.append(deleteOldRdn);
+ return builder.toByteArray();
}
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V23() throws UnsupportedEncodingException
+ public byte[] getBytes_V23()
{
- // Encoding V2 / V3
-
- byte[] byteNewRdn = newRDN.getBytes("UTF-8");
- byte[] byteNewSuperior = null;
- byte[] byteNewSuperiorId = null;
-
- // calculate the length necessary to encode the parameters
- int length = byteNewRdn.length + 1 + 1;
- if (newSuperior != null)
- {
- byteNewSuperior = newSuperior.getBytes("UTF-8");
- length += byteNewSuperior.length + 1;
- }
- else
- length += 1;
-
- if (newSuperiorEntryUUID != null)
- {
- byteNewSuperiorId = newSuperiorEntryUUID.getBytes("UTF-8");
- length += byteNewSuperiorId.length + 1;
- }
- else
- length += 1;
-
- length += encodedMods.length + 1;
-
- /* encode the header in a byte[] large enough to also contain mods.. */
- byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFYDN, length,
- ProtocolVersion.REPLICATION_PROTOCOL_V3);
- int pos = encodedMsg.length - length;
-
- /* put the new RDN and a terminating 0 */
- pos = addByteArray(byteNewRdn, encodedMsg, pos);
-
- /* put the newsuperior and a terminating 0 */
- if (newSuperior != null)
- {
- pos = addByteArray(byteNewSuperior, encodedMsg, pos);
- }
- else
- encodedMsg[pos++] = 0;
-
- /* put the newsuperiorId and a terminating 0 */
- if (newSuperiorEntryUUID != null)
- {
- pos = addByteArray(byteNewSuperiorId, encodedMsg, pos);
- }
- else
- encodedMsg[pos++] = 0;
-
- /* put the deleteoldrdn flag */
- if (deleteOldRdn)
- encodedMsg[pos++] = 1;
- else
- encodedMsg[pos++] = 0;
-
- /* add the mods */
- if (encodedMods.length > 0)
- {
- pos = encodedMsg.length - (encodedMods.length + 1);
- addByteArray(encodedMods, encodedMsg, pos);
- }
- else
- encodedMsg[pos++] = 0;
-
- return encodedMsg;
+ final ByteArrayBuilder builder =
+ encodeHeader(MSG_TYPE_MODIFYDN,ProtocolVersion.REPLICATION_PROTOCOL_V3);
+ builder.append(newRDN);
+ builder.append(newSuperior);
+ builder.append(newSuperiorEntryUUID);
+ builder.append(deleteOldRdn);
+ builder.appendZeroTerminated(encodedMods);
+ return builder.toByteArray();
}
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V45(short reqProtocolVersion)
- throws UnsupportedEncodingException
+ public byte[] getBytes_V45(short protocolVersion)
{
- byte[] byteNewSuperior = null;
- byte[] byteNewSuperiorId = null;
-
- // calculate the length necessary to encode the parameters
-
- byte[] byteNewRdn = newRDN.getBytes("UTF-8");
- int bodyLength = byteNewRdn.length + 1 + 1;
-
- if (newSuperior != null)
- {
- byteNewSuperior = newSuperior.getBytes("UTF-8");
- bodyLength += byteNewSuperior.length + 1;
- }
- else
- bodyLength += 1;
-
- if (newSuperiorEntryUUID != null)
- {
- byteNewSuperiorId = newSuperiorEntryUUID.getBytes("UTF-8");
- bodyLength += byteNewSuperiorId.length + 1;
- }
- else
- bodyLength += 1;
-
- byte[] byteModsLen =
- String.valueOf(encodedMods.length).getBytes("UTF-8");
- bodyLength += byteModsLen.length + 1;
- bodyLength += encodedMods.length + 1;
-
- byte[] byteEntryAttrLen =
- String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
- bodyLength += byteEntryAttrLen.length + 1;
- bodyLength += encodedEclIncludes.length + 1;
-
- /* encode the header in a byte[] large enough to also contain mods.. */
- byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFYDN, bodyLength,
- reqProtocolVersion);
-
- int pos = encodedMsg.length - bodyLength;
-
- /* put the new RDN and a terminating 0 */
- pos = addByteArray(byteNewRdn, encodedMsg, pos);
- /* put the newsuperior and a terminating 0 */
- if (newSuperior != null)
- {
- pos = addByteArray(byteNewSuperior, encodedMsg, pos);
- }
- else
- encodedMsg[pos++] = 0;
- /* put the newsuperiorId and a terminating 0 */
- if (newSuperiorEntryUUID != null)
- {
- pos = addByteArray(byteNewSuperiorId, encodedMsg, pos);
- }
- else
- encodedMsg[pos++] = 0;
-
- /* put the deleteoldrdn flag */
- if (deleteOldRdn)
- encodedMsg[pos++] = 1;
- else
- encodedMsg[pos++] = 0;
-
- pos = addByteArray(byteModsLen, encodedMsg, pos);
- pos = addByteArray(encodedMods, encodedMsg, pos);
-
- pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
- pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
-
- return encodedMsg;
+ final ByteArrayBuilder builder =
+ encodeHeader(MSG_TYPE_MODIFYDN, protocolVersion);
+ builder.append(newRDN);
+ builder.append(newSuperior);
+ builder.append(newSuperiorEntryUUID);
+ builder.append(deleteOldRdn);
+ builder.appendUTF8(encodedMods.length);
+ builder.appendZeroTerminated(encodedMods);
+ builder.appendUTF8(encodedEclIncludes.length);
+ builder.appendZeroTerminated(encodedEclIncludes);
+ return builder.toByteArray();
}
// ============
// Msg decoding
// ============
- private void decodeBody_V123(byte[] in, int pos)
- throws DataFormatException, UnsupportedEncodingException
+ private void decodeBody_V123(ByteArrayScanner scanner, byte msgType)
+ throws DataFormatException
{
- /* read the newRDN
- * first calculate the length then construct the string
- */
- int length = getNextLength(in, pos);
- newRDN = new String(in, pos, length, "UTF-8");
- pos += length + 1;
-
- /* read the newSuperior
- * first calculate the length then construct the string
- */
- length = getNextLength(in, pos);
- if (length != 0)
- newSuperior = new String(in, pos, length, "UTF-8");
- else
- newSuperior = null;
- pos += length + 1;
-
- /* read the new parent Id
- */
- length = getNextLength(in, pos);
- if (length != 0)
- newSuperiorEntryUUID = new String(in, pos, length, "UTF-8");
- else
- newSuperiorEntryUUID = null;
- pos += length + 1;
-
- /* get the deleteoldrdn flag */
- deleteOldRdn = in[pos] != 0;
- pos++;
+ newRDN = scanner.nextString();
+ newSuperior = scanner.nextString();
+ newSuperiorEntryUUID = scanner.nextString();
+ deleteOldRdn = scanner.nextBoolean();
// For easiness (no additional method), simply compare PDU type to
// know if we have to read the mods of V2
- if (in[0] == MSG_TYPE_MODIFYDN)
+ if (msgType == MSG_TYPE_MODIFYDN)
{
- /* Read the mods : all the remaining bytes but the terminating 0 */
- length = in.length - pos - 1;
- if (length > 0) // Otherwise, there is only the trailing 0 byte which we
- // do not need to read
- {
- encodedMods = new byte[length];
- try
- {
- System.arraycopy(in, pos, encodedMods, 0, length);
- } catch (IndexOutOfBoundsException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (ArrayStoreException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (NullPointerException e)
- {
- throw new DataFormatException(e.getMessage());
- }
- }
+ encodedMods = scanner.remainingBytesZeroTerminated();
}
}
- private void decodeBody_V4(byte[] in, int pos)
- throws DataFormatException, UnsupportedEncodingException
+ private void decodeBody_V4(ByteArrayScanner scanner)
+ throws DataFormatException
{
- /* read the newRDN
- * first calculate the length then construct the string
- */
- int length = getNextLength(in, pos);
- newRDN = new String(in, pos, length, "UTF-8");
- pos += length + 1;
+ newRDN = scanner.nextString();
+ newSuperior = scanner.nextString();
+ newSuperiorEntryUUID = scanner.nextString();
+ deleteOldRdn = scanner.nextBoolean();
- /* read the newSuperior
- * first calculate the length then construct the string
- */
- length = getNextLength(in, pos);
- if (length != 0)
- newSuperior = new String(in, pos, length, "UTF-8");
- else
- newSuperior = null;
- pos += length + 1;
+ final int modsLen = scanner.nextIntUTF8();
+ encodedMods = scanner.nextByteArray(modsLen);
+ scanner.skipZeroSeparator();
- /* read the new parent Id
- */
- length = getNextLength(in, pos);
- if (length != 0)
- newSuperiorEntryUUID = new String(in, pos, length, "UTF-8");
- else
- newSuperiorEntryUUID = null;
- pos += length + 1;
-
- /* get the deleteoldrdn flag */
- deleteOldRdn = in[pos] != 0;
- pos++;
-
- // Read mods len
- length = getNextLength(in, pos);
- int modsLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
- pos += length + 1;
-
- // Read/Don't decode attributes
- this.encodedMods = new byte[modsLen];
- try
- {
- System.arraycopy(in, pos, encodedMods, 0, modsLen);
- } catch (IndexOutOfBoundsException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (ArrayStoreException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (NullPointerException e)
- {
- throw new DataFormatException(e.getMessage());
- }
- pos += modsLen + 1;
-
- // Read ecl attr len
- length = getNextLength(in, pos);
- int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
- pos += length + 1;
-
- // Read/Don't decode entry attributes
- encodedEclIncludes = new byte[eclAttrLen];
- try
- {
- System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
- } catch (IndexOutOfBoundsException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (ArrayStoreException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (NullPointerException e)
- {
- throw new DataFormatException(e.getMessage());
- }
+ final int eclAttrLen = scanner.nextIntUTF8();
+ encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
}
/** {@inheritDoc} */
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
index 1a49391..33ec848 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -26,7 +26,6 @@
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.zip.DataFormatException;
@@ -50,7 +49,7 @@
*
* @param op The operation to use for building the message
*/
- public ModifyMsg(PostOperationModifyOperation op)
+ ModifyMsg(PostOperationModifyOperation op)
{
super((OperationContext) op.getAttachment(OperationContext.SYNCHROCONTEXT),
op.getEntryDN());
@@ -77,41 +76,35 @@
*
* @param in The byte[] from which the operation must be read.
* @throws DataFormatException If the input byte[] is not a valid ModifyMsg
- * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM.
*/
- public ModifyMsg(byte[] in) throws DataFormatException,
- UnsupportedEncodingException
+ ModifyMsg(byte[] in) throws DataFormatException
{
- // Decode header
- byte[] allowedPduTypes = new byte[2];
- allowedPduTypes[0] = MSG_TYPE_MODIFY;
- allowedPduTypes[1] = MSG_TYPE_MODIFY_V1;
- int pos = decodeHeader(allowedPduTypes, in);
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ decodeHeader(scanner, MSG_TYPE_MODIFY, MSG_TYPE_MODIFY_V1);
- // protocol version has been read as part of the header
if (protocolVersion <= 3)
- decodeBody_V123(in, pos);
+ {
+ decodeBody_V123(scanner);
+ }
else
- decodeBody_V4(in, pos);
+ {
+ decodeBody_V4(scanner);
+ }
if (protocolVersion==ProtocolVersion.getCurrentVersion())
{
bytes = in;
}
-
}
/**
* Creates a new Modify message from a V1 byte[].
*
* @param in The byte[] from which the operation must be read.
- * @throws DataFormatException If the input byte[] is not a valid ModifyMsg
- * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM.
- *
* @return The created ModifyMsg.
+ * @throws DataFormatException If the input byte[] is not a valid ModifyMsg
*/
- public static ModifyMsg createV1(byte[] in) throws DataFormatException,
- UnsupportedEncodingException
+ static ModifyMsg createV1(byte[] in) throws DataFormatException
{
ModifyMsg msg = new ModifyMsg(in);
@@ -127,7 +120,9 @@
DN newDN) throws LDAPException, ASN1Exception, DataFormatException
{
if (newDN == null)
+ {
newDN = getDN();
+ }
List<RawModification> ldapmods = decodeRawMods(encodedMods);
@@ -178,134 +173,53 @@
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V1() throws UnsupportedEncodingException
+ public byte[] getBytes_V1()
{
- /* encode the header in a byte[] large enough to also contain the mods */
- byte[] encodedMsg = encodeHeader_V1(MSG_TYPE_MODIFY_V1, encodedMods.length +
- 1);
-
- /* add the mods */
- int pos = encodedMsg.length - (encodedMods.length + 1);
- addByteArray(encodedMods, encodedMsg, pos);
-
- return encodedMsg;
+ final ByteArrayBuilder builder = encodeHeader_V1(MSG_TYPE_MODIFY_V1);
+ builder.append(encodedMods);
+ return builder.toByteArray();
}
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V23() throws UnsupportedEncodingException
+ public byte[] getBytes_V23()
{
- // Encoding V2 / V3
-
- /* encode the header in a byte[] large enough to also contain mods */
- byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1,
- ProtocolVersion.REPLICATION_PROTOCOL_V3);
-
- /* add the mods */
- int pos = encodedMsg.length - (encodedMods.length + 1);
- addByteArray(encodedMods, encodedMsg, pos);
-
- return encodedMsg;
+ final ByteArrayBuilder builder =
+ encodeHeader(MSG_TYPE_MODIFY, ProtocolVersion.REPLICATION_PROTOCOL_V3);
+ builder.append(encodedMods);
+ return builder.toByteArray();
}
/** {@inheritDoc} */
@Override
- public byte[] getBytes_V45(short reqProtocolVersion)
- throws UnsupportedEncodingException
+ public byte[] getBytes_V45(short protocolVersion)
{
- int bodyLength = 0;
- byte[] byteModsLen =
- String.valueOf(encodedMods.length).getBytes("UTF-8");
- bodyLength += byteModsLen.length + 1;
- bodyLength += encodedMods.length + 1;
-
- byte[] byteEntryAttrLen =
- String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
- bodyLength += byteEntryAttrLen.length + 1;
- bodyLength += encodedEclIncludes.length + 1;
-
- /* encode the header in a byte[] large enough to also contain the mods */
- byte [] encodedMsg = encodeHeader(MSG_TYPE_MODIFY, bodyLength,
- reqProtocolVersion);
-
- int pos = encodedMsg.length - bodyLength;
- pos = addByteArray(byteModsLen, encodedMsg, pos);
- pos = addByteArray(encodedMods, encodedMsg, pos);
- pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
- pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
- return encodedMsg;
+ final ByteArrayBuilder builder =
+ encodeHeader(MSG_TYPE_MODIFY, protocolVersion);
+ builder.appendUTF8(encodedMods.length);
+ builder.append(encodedMods);
+ builder.appendUTF8(encodedEclIncludes.length);
+ builder.append(encodedEclIncludes);
+ return builder.toByteArray();
}
// ============
// Msg decoding
// ============
- private void decodeBody_V123(byte[] in, int pos)
- throws DataFormatException
+ private void decodeBody_V123(ByteArrayScanner scanner)
+ throws DataFormatException
{
- // Read and store the mods, in encoded form
- // all the remaining bytes but the terminating 0 */
- int length = in.length - pos - 1;
- encodedMods = new byte[length];
- try
- {
- System.arraycopy(in, pos, encodedMods, 0, length);
- } catch (IndexOutOfBoundsException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (ArrayStoreException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (NullPointerException e)
- {
- throw new DataFormatException(e.getMessage());
- }
+ encodedMods = scanner.remainingBytes();
}
- private void decodeBody_V4(byte[] in, int pos)
- throws DataFormatException, UnsupportedEncodingException
+ private void decodeBody_V4(ByteArrayScanner scanner)
+ throws DataFormatException
{
- // Read mods len
- int length = getNextLength(in, pos);
- int modsLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
- pos += length + 1;
+ final int modsLen = scanner.nextIntUTF8();
+ this.encodedMods = scanner.nextByteArray(modsLen);
- // Read/Don't decode mods
- this.encodedMods = new byte[modsLen];
- try
- {
- System.arraycopy(in, pos, encodedMods, 0, modsLen);
- } catch (IndexOutOfBoundsException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (ArrayStoreException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (NullPointerException e)
- {
- throw new DataFormatException(e.getMessage());
- }
- pos += modsLen + 1;
-
- // Read ecl attr len
- length = getNextLength(in, pos);
- int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
- pos += length + 1;
-
- // Read/Don't decode entry attributes
- encodedEclIncludes = new byte[eclAttrLen];
- try
- {
- System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
- } catch (IndexOutOfBoundsException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (ArrayStoreException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (NullPointerException e)
- {
- throw new DataFormatException(e.getMessage());
- }
+ final int eclAttrLen = scanner.nextIntUTF8();
+ encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
}
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index a25690c..470ed18 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -27,21 +27,16 @@
package org.opends.server.replication.protocol;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.zip.DataFormatException;
-import org.opends.server.protocols.asn1.ASN1;
import org.opends.server.protocols.asn1.ASN1Reader;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
-import org.opends.server.types.ByteSequenceReader;
-import org.opends.server.types.ByteString;
-import org.opends.server.types.ByteStringBuilder;
/**
* This message is part of the replication protocol.
@@ -69,7 +64,7 @@
* first missing change for each LDAP server connected to a Replication
* Server.
*/
- static class ServerData
+ private static class ServerData
{
private ServerState state;
private long approxFirstMissingDate;
@@ -79,7 +74,7 @@
* Data structure to manage the state of this replication server
* and the state information for the servers connected to it.
*/
- static class SubTopoMonitorData
+ private static class SubTopoMonitorData
{
/** This replication server DbState. */
private ServerState replServerDbState;
@@ -91,7 +86,7 @@
new HashMap<Integer, ServerData>();
}
- private SubTopoMonitorData data = new SubTopoMonitorData();
+ private final SubTopoMonitorData data = new SubTopoMonitorData();
/**
* Creates a new MonitorMsg.
@@ -120,18 +115,22 @@
* @param state The server state.
* @param approxFirstMissingDate The approximation of the date
* of the older missing change. null when none.
- * @param isLDAP Specifies whether the server is a LS or a RS
+ * @param isLDAPServer Specifies whether the server is a DS or a RS
*/
public void setServerState(int serverId, ServerState state,
- long approxFirstMissingDate, boolean isLDAP)
+ long approxFirstMissingDate, boolean isLDAPServer)
{
- ServerData sd = new ServerData();
+ final ServerData sd = new ServerData();
sd.state = state;
sd.approxFirstMissingDate = approxFirstMissingDate;
- if (isLDAP)
+ if (isLDAPServer)
+ {
data.ldapStates.put(serverId, sd);
+ }
else
+ {
data.rsStates.put(serverId, sd);
+ }
}
/**
@@ -154,7 +153,6 @@
return data.rsStates.get(serverId).state;
}
-
/**
* Get the approximation of the date of the older missing change for the
* LDAP Server with the provided server Id.
@@ -185,69 +183,32 @@
* @throws DataFormatException If the byte array does not contain a valid
* encoded form of the ServerStartMessage.
*/
- public MonitorMsg(byte[] in, short version) throws DataFormatException
+ MonitorMsg(byte[] in, short version) throws DataFormatException
{
- ByteSequenceReader reader = ByteString.wrap(in).asReader();
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ if (scanner.nextByte() != MSG_TYPE_REPL_SERVER_MONITOR)
+ {
+ throw new DataFormatException("input is not a valid "
+ + getClass().getCanonicalName());
+ }
if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
- try
- {
- /* first byte is the type */
- if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
- throw new DataFormatException("input is not a valid " +
- this.getClass().getCanonicalName());
- int pos = 1;
-
- // sender
- int length = getNextLength(in, pos);
- String senderIDString = new String(in, pos, length, "UTF-8");
- this.senderID = Integer.valueOf(senderIDString);
- pos += length +1;
-
- // destination
- length = getNextLength(in, pos);
- String destinationString = new String(in, pos, length, "UTF-8");
- this.destination = Integer.valueOf(destinationString);
- pos += length +1;
-
- reader.position(pos);
- }
- catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
+ this.senderID = scanner.nextIntUTF8();
+ this.destination = scanner.nextIntUTF8();
+ }
+ else if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
+ {
+ this.senderID = scanner.nextShort();
+ this.destination = scanner.nextShort();
}
else
{
- if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
- throw new DataFormatException("input is not a valid " +
- this.getClass().getCanonicalName());
-
- /*
- * V4 and above uses integers for its serverIds while V2 and V3
- * use shorts.
- */
- if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
- {
- // sender
- this.senderID = reader.getShort();
-
- // destination
- this.destination = reader.getShort();
- }
- else
- {
- // sender
- this.senderID = reader.getInt();
-
- // destination
- this.destination = reader.getInt();
- }
+ this.senderID = scanner.nextInt();
+ this.destination = scanner.nextInt();
}
-
- ASN1Reader asn1Reader = ASN1.getReader(reader);
+ ASN1Reader asn1Reader = scanner.getASN1Reader();
try
{
asn1Reader.readStartSequence();
@@ -297,13 +258,7 @@
else
{
// the next states are the server states
- ServerData sd = new ServerData();
- sd.state = newState;
- sd.approxFirstMissingDate = outime;
- if (isLDAPServer)
- data.ldapStates.put(serverId, sd);
- else
- data.rsStates.put(serverId, sd);
+ setServerState(serverId, newState, outime, isLDAPServer);
}
}
asn1Reader.readEndSequence();
@@ -312,39 +267,19 @@
}
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
try
{
- ByteStringBuilder byteBuilder = new ByteStringBuilder();
-
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
- {
- /* put the type of the operation */
- byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
-
- /*
- * V4 and above uses integers for its serverIds while V2 and V3
- * use shorts.
- */
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- byteBuilder.append(senderID);
- byteBuilder.append(destination);
- }
- else
- {
- byteBuilder.append((short)senderID);
- byteBuilder.append((short)destination);
- }
- }
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_REPL_SERVER_MONITOR);
+ append(builder, senderID, protocolVersion);
+ append(builder, destination, protocolVersion);
/* Put the serverStates ... */
- ASN1Writer writer = ASN1.getWriter(byteBuilder);
+ ASN1Writer writer = builder.getASN1Writer();
writer.writeStartSequence();
{
/* first put the Replication Server state */
@@ -354,39 +289,18 @@
}
writer.writeEndSequence();
- // then the LDAP server data
+ // then the DS + RS server data
writeServerStates(protocolVersion, writer, false /* DS */);
-
- // then the RS server datas
writeServerStates(protocolVersion, writer, true /* RS */);
}
writer.writeEndSequence();
- if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
- return byteBuilder.toByteArray();
+ // legacy coding mistake
+ builder.append((byte) 0);
}
- else
- {
- byte[] temp = byteBuilder.toByteArray();
-
- byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
- byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
-
- int length = 1 + 1 + senderBytes.length +
- 1 + destinationBytes.length + temp.length +1;
- byte[] resultByteArray = new byte[length];
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
- int pos = 1;
-
- pos = addByteArray(senderBytes, resultByteArray, pos);
- pos = addByteArray(destinationBytes, resultByteArray, pos);
- pos = addByteArray(temp, resultByteArray, pos);
-
- return resultByteArray;
- }
+ return builder.toByteArray();
}
catch (Exception e)
{
@@ -394,6 +308,23 @@
}
}
+ private void append(final ByteArrayBuilder builder, int data,
+ short protocolVersion)
+ {
+ if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ {
+ builder.appendUTF8(data);
+ }
+ else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
+ {
+ builder.append((short) data);
+ }
+ else // protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4
+ {
+ builder.append(data);
+ }
+ }
+
private void writeServerStates(short protocolVersion, ASN1Writer writer,
boolean writeRSStates) throws IOException
{
@@ -454,8 +385,6 @@
return data.rsStates.keySet().iterator();
}
-
-
/**
* Get the destination.
*
@@ -466,8 +395,6 @@
return destination;
}
-
-
/**
* Get the server ID of the server that sent this message.
*
@@ -478,15 +405,11 @@
return senderID;
}
-
-
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
- StringBuilder stateS = new StringBuilder("\nRState:[");
+ final StringBuilder stateS = new StringBuilder("\nRState:[");
stateS.append(data.replServerDbState);
stateS.append("]");
@@ -502,10 +425,10 @@
stateS.append("\nRSStates:[");
for (Entry<Integer, ServerData> entry : data.rsStates.entrySet())
{
- ServerData sd = entry.getValue();
+ final ServerData sd = entry.getValue();
stateS.append("\n[RSState(").append(entry.getKey()).append(")=")
.append(sd.state).append("]").append(" afmd=")
- .append(sd.approxFirstMissingDate + "]");
+ .append(sd.approxFirstMissingDate).append("]");
}
return getClass().getCanonicalName() +
"[ sender=" + this.senderID +
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
index cfe790c..a0f0bca 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -26,7 +26,6 @@
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -46,8 +45,6 @@
*/
private final int senderID;
-
-
/**
* Creates a message.
*
@@ -70,70 +67,30 @@
* @throws DataFormatException
* If the in does not contain a properly, encoded message.
*/
- public MonitorRequestMsg(byte[] in) throws DataFormatException
+ MonitorRequestMsg(byte[] in) throws DataFormatException
{
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
{
- // First byte is the type
- if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
- throw new DataFormatException("input is not a valid "
- + this.getClass().getCanonicalName());
- int pos = 1;
-
- // sender
- int length = getNextLength(in, pos);
- String senderString = new String(in, pos, length, "UTF-8");
- this.senderID = Integer.valueOf(senderString);
- pos += length + 1;
-
- // destination
- length = getNextLength(in, pos);
- String destinationString = new String(in, pos, length, "UTF-8");
- this.destination = Integer.valueOf(destinationString);
- pos += length + 1;
-
+ throw new DataFormatException("input is not a valid "
+ + getClass().getCanonicalName());
}
- catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
+ this.senderID = scanner.nextIntUTF8();
+ this.destination = scanner.nextIntUTF8();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
- try
- {
- byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
- byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
-
- int length = 1 + senderBytes.length + 1 + destinationBytes.length + 1;
-
- byte[] resultByteArray = new byte[length];
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR_REQUEST;
- int pos = 1;
-
- /* put the sender */
- pos = addByteArray(senderBytes, resultByteArray, pos);
-
- /* put the destination */
- pos = addByteArray(destinationBytes, resultByteArray, pos);
-
- return resultByteArray;
- }
- catch (UnsupportedEncodingException e)
- {
- return null;
- }
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_REPL_SERVER_MONITOR_REQUEST);
+ builder.appendUTF8(senderID);
+ builder.appendUTF8(destination);
+ return builder.toByteArray();
}
-
-
/**
* Get the destination.
*
@@ -144,8 +101,6 @@
return destination;
}
-
-
/**
* Get the server ID of the server that sent this message.
*
@@ -156,13 +111,12 @@
return senderID;
}
-
-
/**
* Returns a string representation of the message.
*
* @return the string representation of this message.
*/
+ @Override
public String toString()
{
return "[" + getClass().getCanonicalName() + " sender=" + senderID
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
index f0edfef..ed64711 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
@@ -22,16 +22,14 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
/**
* Message sent by a replication server to a directory server in reply to the
@@ -39,17 +37,17 @@
*/
public class ReplServerStartDSMsg extends StartMsg
{
- private int serverId;
- private String serverURL;
- private DN baseDN;
- private int windowSize;
- private ServerState serverState;
+ private final int serverId;
+ private final String serverURL;
+ private final DN baseDN;
+ private final int windowSize;
+ private final ServerState serverState;
/**
* Whether to continue using SSL to encrypt messages after the start
* messages have been exchanged.
*/
- private boolean sslEncryption;
+ private final boolean sslEncryption;
/**
* Threshold value used by the RS to determine if a DS must be put in
@@ -61,12 +59,12 @@
/**
* The weight affected to the replication server.
*/
- private int weight = -1;
+ private final int weight;
/**
* Number of currently connected DS to the replication server.
*/
- private int connectedDSNumber = -1;
+ private final int connectedDSNumber;
/**
* Create a ReplServerStartDSMsg.
@@ -115,100 +113,25 @@
* @throws DataFormatException If the in does not contain a properly
* encoded ReplServerStartDSMsg.
*/
- public ReplServerStartDSMsg(byte[] in) throws DataFormatException
+ ReplServerStartDSMsg(byte[] in) throws DataFormatException
{
- byte[] allowedPduTypes = new byte[1];
- allowedPduTypes[0] = MSG_TYPE_REPL_SERVER_START_DS;
- headerLength = decodeHeader(allowedPduTypes, in);
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ decodeHeader(scanner, MSG_TYPE_REPL_SERVER_START_DS);
- try
- {
- /* The ReplServerStartDSMsg payload is stored in the form :
- * <baseDN><serverId><serverURL><windowSize><sslEncryption>
- * <degradedStatusThreshold><weight><connectedDSNumber>
- * <serverState>
- */
-
- /* first bytes are the header */
- int pos = headerLength;
-
- /* read the dn
- * first calculate the length then construct the string
- */
- int length = getNextLength(in, pos);
- baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the ServerId
- */
- length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- serverId = Integer.valueOf(serverIdString);
- pos += length +1;
-
- /*
- * read the ServerURL
- */
- length = getNextLength(in, pos);
- serverURL = new String(in, pos, length, "UTF-8");
- pos += length +1;
-
- /*
- * read the window size
- */
- length = getNextLength(in, pos);
- windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the sslEncryption setting
- */
- length = getNextLength(in, pos);
- sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /**
- * read the degraded status threshold
- */
- length = getNextLength(in, pos);
- degradedStatusThreshold =
- Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
-
- /*
- * read the weight
- */
- length = getNextLength(in, pos);
- String weightString = new String(in, pos, length, "UTF-8");
- weight = Integer.valueOf(weightString);
- pos += length +1;
-
- /*
- * read the connected DS number
- */
- length = getNextLength(in, pos);
- String connectedDSNumberString = new String(in, pos, length, "UTF-8");
- connectedDSNumber = Integer.valueOf(connectedDSNumberString);
- pos += length +1;
-
- // Read the ServerState
- // Caution: ServerState MUST be the last field. Because ServerState can
- // contain null character (string termination of serverid string ..) it
- // cannot be decoded using getNextLength() like the other fields. The
- // only way is to rely on the end of the input buffer : and that forces
- // the ServerState to be the last. This should be changed and we want to
- // have more than one ServerState field.
- serverState = new ServerState(in, pos, in.length - 1);
- }
- catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- catch (DirectoryException e)
- {
- throw new DataFormatException(e.getLocalizedMessage());
- }
+ /* The ReplServerStartDSMsg payload is stored in the form :
+ * <baseDN><serverId><serverURL><windowSize><sslEncryption>
+ * <degradedStatusThreshold><weight><connectedDSNumber>
+ * <serverState>
+ */
+ baseDN = scanner.nextDN();
+ serverId = scanner.nextIntUTF8();
+ serverURL = scanner.nextString();
+ windowSize = scanner.nextIntUTF8();
+ sslEncryption = Boolean.valueOf(scanner.nextString());//FIXME
+ degradedStatusThreshold =scanner.nextIntUTF8();
+ weight = scanner.nextIntUTF8();
+ connectedDSNumber = scanner.nextIntUTF8();
+ serverState = scanner.nextServerState();
}
/**
@@ -248,72 +171,28 @@
return this.serverState;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
- public byte[] getBytes(short sessionProtocolVersion)
- throws UnsupportedEncodingException
+ public byte[] getBytes(short protocolVersion)
{
/* The ReplServerStartDSMsg is stored in the form :
* <operation type><baseDN><serverId><serverURL><windowSize><sslEncryption>
* <degradedStatusThreshold><weight><connectedDSNumber>
* <serverState>
*/
- byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
- byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
- byte[] byteServerUrl = serverURL.getBytes("UTF-8");
- byte[] byteServerState = serverState.getBytes();
- byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
- byte[] byteSSLEncryption =
- String.valueOf(sslEncryption).getBytes("UTF-8");
- byte[] byteDegradedStatusThreshold =
- String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
- byte[] byteWeight =
- String.valueOf(weight).getBytes("UTF-8");
- byte[] byteConnectedDSNumber =
- String.valueOf(connectedDSNumber).getBytes("UTF-8");
-
- int length = byteDn.length + 1 + byteServerId.length + 1 +
- byteServerUrl.length + 1 + byteWindowSize.length + 1 +
- byteSSLEncryption.length + 1 + byteDegradedStatusThreshold.length + 1 +
- byteWeight.length + 1 + byteConnectedDSNumber.length + 1 +
- byteServerState.length + 1;
-
- /* encode the header in a byte[] large enough */
- byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START_DS,
- length, sessionProtocolVersion);
-
- int pos = headerLength;
-
- /* put the baseDN and a terminating 0 */
- pos = addByteArray(byteDn, resultByteArray, pos);
-
- /* put the ServerId */
- pos = addByteArray(byteServerId, resultByteArray, pos);
-
- /* put the ServerURL */
- pos = addByteArray(byteServerUrl, resultByteArray, pos);
-
- /* put the window size */
- pos = addByteArray(byteWindowSize, resultByteArray, pos);
-
- /* put the SSL Encryption setting */
- pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
-
- /* put the degraded status threshold */
- pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
-
- /* put the weight */
- pos = addByteArray(byteWeight, resultByteArray, pos);
-
- /* put the connected DS number */
- pos = addByteArray(byteConnectedDSNumber, resultByteArray, pos);
-
- /* put the ServerState */
- pos = addByteArray(byteServerState, resultByteArray, pos);
-
- return resultByteArray;
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ encodeHeader(MSG_TYPE_REPL_SERVER_START_DS, builder, protocolVersion);
+ builder.append(baseDN);
+ builder.appendUTF8(serverId);
+ builder.append(serverURL);
+ builder.appendUTF8(windowSize);
+ builder.append(Boolean.toString(sslEncryption));
+ builder.appendUTF8(degradedStatusThreshold);
+ builder.appendUTF8(weight);
+ builder.appendUTF8(connectedDSNumber);
+ // Caution: ServerState MUST be the last field.
+ builder.append(serverState);
+ return builder.toByteArray();
}
/**
@@ -356,9 +235,7 @@
this.degradedStatusThreshold = degradedStatusThreshold;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
index 5938813..5a58fd6 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -22,16 +22,14 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
/**
* Message sent by a replication server to another replication server
@@ -39,17 +37,17 @@
*/
public class ReplServerStartMsg extends StartMsg
{
- private Integer serverId;
- private String serverURL;
- private DN baseDN;
- private int windowSize;
- private ServerState serverState;
+ private final int serverId;
+ private final String serverURL;
+ private final DN baseDN;
+ private final int windowSize;
+ private final ServerState serverState;
/**
* Whether to continue using SSL to encrypt messages after the start
* messages have been exchanged.
*/
- private boolean sslEncryption;
+ private final boolean sslEncryption;
/**
* NOTE: Starting from protocol V4, we introduce a dedicated PDU for answering
@@ -106,166 +104,28 @@
* @throws DataFormatException If the in does not contain a properly
* encoded ReplServerStartMsg.
*/
- public ReplServerStartMsg(byte[] in) throws DataFormatException
+ ReplServerStartMsg(byte[] in) throws DataFormatException
{
- byte[] allowedPduTypes = new byte[2];
- allowedPduTypes[0] = MSG_TYPE_REPL_SERVER_START;
- allowedPduTypes[1] = MSG_TYPE_REPL_SERVER_START_V1;
- headerLength = decodeHeader(allowedPduTypes, in);
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ decodeHeader(scanner,
+ MSG_TYPE_REPL_SERVER_START, MSG_TYPE_REPL_SERVER_START_V1);
- // Protocol version has been read as part of the header:
- // decode the body according to the protocol version read in the header
- switch(protocolVersion)
+ /* The ReplServerStartMsg payload is stored in the form :
+ * <baseDN><serverId><serverURL><windowSize><sslEncryption>
+ * <degradedStatusThreshold><serverState>
+ */
+ baseDN = scanner.nextDN();
+ serverId = scanner.nextIntUTF8();
+ serverURL = scanner.nextString();
+ windowSize = scanner.nextIntUTF8();
+ sslEncryption = Boolean.valueOf(scanner.nextString());
+
+ if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
- case ProtocolVersion.REPLICATION_PROTOCOL_V1:
- decodeBody_V1(in, headerLength);
- return;
+ degradedStatusThreshold = scanner.nextIntUTF8();
}
- try
- {
- /* The ReplServerStartMsg payload is stored in the form :
- * <baseDN><serverId><serverURL><windowSize><sslEncryption>
- * <degradedStatusThreshold><serverState>
- */
-
- /* first bytes are the header */
- int pos = headerLength;
-
- /* read the dn
- * first calculate the length then construct the string
- */
- int length = getNextLength(in, pos);
- baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the ServerId
- */
- length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- serverId = Integer.valueOf(serverIdString);
- pos += length +1;
-
- /*
- * read the ServerURL
- */
- length = getNextLength(in, pos);
- serverURL = new String(in, pos, length, "UTF-8");
- pos += length +1;
-
- /*
- * read the window size
- */
- length = getNextLength(in, pos);
- windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the sslEncryption setting
- */
- length = getNextLength(in, pos);
- sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /**
- * read the degraded status threshold
- */
- length = getNextLength(in, pos);
- degradedStatusThreshold =
- Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
-
- // Read the ServerState
- // Caution: ServerState MUST be the last field. Because ServerState can
- // contain null character (string termination of serverid string ..) it
- // cannot be decoded using getNextLength() like the other fields. The
- // only way is to rely on the end of the input buffer : and that forces
- // the ServerState to be the last. This should be changed and we want to
- // have more than one ServerState field.
- serverState = new ServerState(in, pos, in.length - 1);
- }
- catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- catch (DirectoryException e)
- {
- throw new DataFormatException(e.getLocalizedMessage());
- }
- }
-
- /**
- * Decodes the body of a just received ReplServerStartMsg. The body is in the
- * passed array, and starts at the provided location. This is for a PDU
- * encoded in V1 protocol version.
- * @param in A byte array containing the body for the ReplServerStartMsg
- * @param pos The position in the array where the decoding should start
- * @throws DataFormatException If the in does not contain a properly
- * encoded ReplServerStartMsg.
- */
- public void decodeBody_V1(byte[] in, int pos) throws DataFormatException
- {
- try
- {
- /* The ReplServerStartMsg payload is stored in the form :
- * <baseDN><serverId><serverURL><windowSize><sslEncryption>
- * <serverState>
- */
-
- /* read the dn
- * first calculate the length then construct the string
- */
- int length = getNextLength(in, pos);
- baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the ServerId
- */
- length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- serverId = Integer.valueOf(serverIdString);
- pos += length +1;
-
- /*
- * read the ServerURL
- */
- length = getNextLength(in, pos);
- serverURL = new String(in, pos, length, "UTF-8");
- pos += length +1;
-
- /*
- * read the window size
- */
- length = getNextLength(in, pos);
- windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the sslEncryption setting
- */
- length = getNextLength(in, pos);
- sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- // Read the ServerState
- // Caution: ServerState MUST be the last field. Because ServerState can
- // contain null character (string termination of serverid string ..) it
- // cannot be decoded using getNextLength() like the other fields. The
- // only way is to rely on the end of the input buffer : and that forces
- // the ServerState to be the last. This should be changed and we want to
- // have more than one ServerState field.
- serverState = new ServerState(in, pos, in.length - 1);
- }
- catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- catch (DirectoryException e)
- {
- throw new DataFormatException(e.getLocalizedMessage());
- }
+ serverState = scanner.nextServerState();
}
/**
@@ -305,69 +165,43 @@
return this.serverState;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
- public byte[] getBytes(short sessionProtocolVersion)
- throws UnsupportedEncodingException
+ public byte[] getBytes(short protocolVersion)
{
- // If an older version requested, encode in the requested way
- switch(sessionProtocolVersion)
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
- case ProtocolVersion.REPLICATION_PROTOCOL_V1:
- return getBytes_V1();
+ /*
+ * The ReplServerStartMessage is stored in the form :
+ * <operation type><basedn><serverid><serverURL><windowsize><serverState>
+ */
+ encodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1, builder);
+ builder.append(baseDN);
+ builder.appendUTF8(serverId);
+ builder.append(serverURL);
+ builder.appendUTF8(windowSize);
+ builder.append(Boolean.toString(sslEncryption));
+ // Caution: ServerState MUST be the last field.
+ builder.append(serverState);
}
-
- /* The ReplServerStartMsg is stored in the form :
- * <operation type><baseDN><serverId><serverURL><windowSize><sslEncryption>
- * <degradedStatusThreshold><serverState>
- */
-
- byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
- byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
- byte[] byteServerUrl = serverURL.getBytes("UTF-8");
- byte[] byteServerState = serverState.getBytes();
- byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
- byte[] byteSSLEncryption =
- String.valueOf(sslEncryption).getBytes("UTF-8");
- byte[] byteDegradedStatusThreshold =
- String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
-
- int length = byteDn.length + 1 + byteServerId.length + 1 +
- byteServerUrl.length + 1 + byteWindowSize.length + 1 +
- byteSSLEncryption.length + 1 +
- byteDegradedStatusThreshold.length + 1 +
- byteServerState.length + 1;
-
- /* encode the header in a byte[] large enough */
- byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length,
- sessionProtocolVersion);
-
- int pos = headerLength;
-
- /* put the baseDN and a terminating 0 */
- pos = addByteArray(byteDn, resultByteArray, pos);
-
- /* put the ServerId */
- pos = addByteArray(byteServerId, resultByteArray, pos);
-
- /* put the ServerURL */
- pos = addByteArray(byteServerUrl, resultByteArray, pos);
-
- /* put the window size */
- pos = addByteArray(byteWindowSize, resultByteArray, pos);
-
- /* put the SSL Encryption setting */
- pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
-
- /* put the degraded status threshold */
- pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
-
- /* put the ServerState */
- pos = addByteArray(byteServerState, resultByteArray, pos);
-
- return resultByteArray;
+ else
+ {
+ /* The ReplServerStartMsg is stored in the form :
+ * <operation type><baseDN><serverId><serverURL><windowSize><sslEncryption>
+ * <degradedStatusThreshold><serverState>
+ */
+ encodeHeader(MSG_TYPE_REPL_SERVER_START, builder, protocolVersion);
+ builder.append(baseDN);
+ builder.appendUTF8(serverId);
+ builder.append(serverURL);
+ builder.appendUTF8(windowSize);
+ builder.append(Boolean.toString(sslEncryption));
+ builder.appendUTF8(degradedStatusThreshold);
+ // Caution: ServerState MUST be the last field.
+ builder.append(serverState);
+ }
+ return builder.toByteArray();
}
/**
@@ -410,9 +244,7 @@
this.degradedStatusThreshold = degradedStatusThreshold;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
@@ -428,64 +260,4 @@
"\ndegradedStatusThreshold: " + degradedStatusThreshold +
"\nwindowSize: " + windowSize;
}
-
- /**
- * Get the byte array representation of this Message. This uses the version
- * 1 of the replication protocol (used for compatibility purpose).
- *
- * @return The byte array representation of this Message.
- *
- * @throws UnsupportedEncodingException When the encoding of the message
- * failed because the UTF-8 encoding is not supported.
- */
- public byte[] getBytes_V1() throws UnsupportedEncodingException
- {
- /*
- * The ReplServerStartMessage is stored in the form :
- * <operation type><basedn><serverid><serverURL><windowsize><serverState>
- */
- try {
- byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
- byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
- byte[] byteServerUrl = serverURL.getBytes("UTF-8");
- byte[] byteServerState = serverState.getBytes();
- byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
- byte[] byteSSLEncryption =
- String.valueOf(sslEncryption).getBytes("UTF-8");
-
- int length = byteDn.length + 1 + byteServerId.length + 1 +
- byteServerUrl.length + 1 + byteWindowSize.length + 1 +
- byteSSLEncryption.length + 1 +
- byteServerState.length + 1;
-
- /* encode the header in a byte[] large enough */
- byte resultByteArray[] = encodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1,
- length);
- int pos = headerLength;
-
- /* put the baseDN and a terminating 0 */
- pos = addByteArray(byteDn, resultByteArray, pos);
-
- /* put the ServerId */
- pos = addByteArray(byteServerId, resultByteArray, pos);
-
- /* put the ServerURL */
- pos = addByteArray(byteServerUrl, resultByteArray, pos);
-
- /* put the window size */
- pos = addByteArray(byteWindowSize, resultByteArray, pos);
-
- /* put the SSL Encryption setting */
- pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
-
- /* put the ServerState */
- pos = addByteArray(byteServerState, resultByteArray, pos);
-
- return resultByteArray;
- }
- catch (UnsupportedEncodingException e)
- {
- return null;
- }
- }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index 4d69586..d828ce3 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -22,11 +22,10 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -106,15 +105,8 @@
* The protocol version to use for serialization. The version should
* normally be older than the current one.
* @return The encoded PDU.
- * @throws UnsupportedEncodingException
- * When the encoding of the message failed because the UTF-8
- * encoding is not supported or the requested protocol version to
- * use is not supported by this PDU.
*/
- public abstract byte[] getBytes(short protocolVersion)
- throws UnsupportedEncodingException;
-
-
+ public abstract byte[] getBytes(short protocolVersion);
/**
* Generates a ReplicationMsg from its encoded form. This un-serialization is
@@ -128,15 +120,12 @@
* @return The generated SynchronizationMessage.
* @throws DataFormatException
* If the encoded form was not a valid msg.
- * @throws UnsupportedEncodingException
- * If UTF8 is not supported.
* @throws NotSupportedOldVersionPDUException
* If the PDU is part of an old protocol version and we do not
* support it.
*/
public static ReplicationMsg generateMsg(byte[] buffer, short protocolVersion)
- throws DataFormatException, UnsupportedEncodingException,
- NotSupportedOldVersionPDUException
+ throws DataFormatException, NotSupportedOldVersionPDUException
{
switch (buffer[0])
{
@@ -214,51 +203,4 @@
throw new DataFormatException("received message with unknown type");
}
}
-
- /**
- * Concatenate the tail byte array into the resultByteArray.
- * The resultByteArray must be large enough before calling this method.
- *
- * @param tail the byte array to concatenate.
- * @param resultByteArray The byte array to concatenate to.
- * @param pos the position where to concatenate.
- * @return the next position to use in the resultByteArray.
- */
- protected static int addByteArray(byte[] tail, byte[] resultByteArray,
- int pos)
- {
- for (int i=0; i<tail.length; i++,pos++)
- {
- resultByteArray[pos] = tail[i];
- }
- resultByteArray[pos++] = 0;
- return pos;
- }
-
-
-
- /**
- * Get the length of the next String encoded in the in byte array.
- *
- * @param in
- * the byte array where to calculate the string.
- * @param pos
- * the position where to start from in the byte array.
- * @return the length of the next string.
- * @throws DataFormatException
- * If the byte array does not end with null.
- */
- protected static int getNextLength(byte[] in, int pos)
- throws DataFormatException
- {
- int offset = pos;
- int length = 0;
- while (in[offset++] != 0)
- {
- if (offset >= in.length)
- throw new DataFormatException("byte[] is not a valid msg");
- length++;
- }
- return length;
- }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java b/opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java
index 44b9515..657fcf9 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java
@@ -22,23 +22,19 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
-
/**
* This message is used by an LDAP server to communicate to the topology
* that the generation must be reset for the domain.
*/
public class ResetGenerationIdMsg extends ReplicationMsg
{
- private long generationId;
+ private final long generationId;
/**
* Creates a new message.
@@ -57,52 +53,25 @@
* @throws DataFormatException If the byte array does not contain a valid
* encoded form of the WindowMessage.
*/
- public ResetGenerationIdMsg(byte[] in) throws DataFormatException
+ ResetGenerationIdMsg(byte[] in) throws DataFormatException
{
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ if (scanner.nextByte() != MSG_TYPE_RESET_GENERATION_ID)
{
- if (in[0] != MSG_TYPE_RESET_GENERATION_ID)
- throw new
- DataFormatException("input is not a valid GenerationId Message");
-
- int pos = 1;
-
- /* read the generationId */
- int length = getNextLength(in, pos);
- generationId = Long.valueOf(new String(in, pos, length,
- "UTF-8"));
- pos += length +1;
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ throw new DataFormatException(
+ "input is not a valid GenerationId Message");
}
-
+ generationId = scanner.nextLongUTF8();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
- try
- {
- ByteArrayOutputStream oStream = new ByteArrayOutputStream();
-
- /* Put the message type */
- oStream.write(MSG_TYPE_RESET_GENERATION_ID);
-
- // Put the generationId
- oStream.write(String.valueOf(generationId).getBytes("UTF-8"));
- oStream.write(0);
-
- return oStream.toByteArray();
- }
- catch (IOException e)
- {
- // never happens
- return null;
- }
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_RESET_GENERATION_ID);
+ builder.appendUTF8(generationId);
+ return builder.toByteArray();
}
/**
@@ -115,9 +84,7 @@
return this.generationId;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
diff --git a/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java b/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
index 4d514fb..d66aba2 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
@@ -22,11 +22,10 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions Copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
@@ -38,26 +37,25 @@
*/
public class ServerStartECLMsg extends StartMsg
{
- private String serverURL;
- private int maxReceiveQueue;
- private int maxSendQueue;
- private int maxReceiveDelay;
- private int maxSendDelay;
- private int windowSize;
- private ServerState serverState = null;
+ private final String serverURL;
+ private final int maxReceiveQueue;
+ private final int maxSendQueue;
+ private final int maxReceiveDelay;
+ private final int maxSendDelay;
+ private final int windowSize;
+ private final ServerState serverState;
/**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
*/
- private long heartbeatInterval = 0;
+ private final long heartbeatInterval;
/**
* Whether to continue using SSL to encrypt messages after the start
* messages have been exchanged.
*/
-
- private boolean sslEncryption;
+ private final boolean sslEncryption;
/**
* Creates a new ServerStartMsg. This message is to be sent by an LDAP
@@ -108,86 +106,21 @@
* @throws DataFormatException If the byte array does not contain a valid
* encoded form of the ServerStartMsg.
*/
- public ServerStartECLMsg(byte[] in) throws DataFormatException
+ ServerStartECLMsg(byte[] in) throws DataFormatException
{
- byte[] allowedPduTypes = new byte[1];
- allowedPduTypes[0] = MSG_TYPE_START_ECL;
- headerLength = decodeHeader(allowedPduTypes, in);
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ decodeHeader(scanner, MSG_TYPE_START_ECL);
- try
- {
- /* first bytes are the header */
- int pos = headerLength;
-
- /*
- * read the ServerURL
- */
- int length = getNextLength(in, pos);
- serverURL = new String(in, pos, length, "UTF-8");
- pos += length +1;
-
- /*
- * read the maxReceiveDelay
- */
- length = getNextLength(in, pos);
- maxReceiveDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the maxReceiveQueue
- */
- length = getNextLength(in, pos);
- maxReceiveQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the maxSendDelay
- */
- length = getNextLength(in, pos);
- maxSendDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the maxSendQueue
- */
- length = getNextLength(in, pos);
- maxSendQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the windowSize
- */
- length = getNextLength(in, pos);
- windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the heartbeatInterval
- */
- length = getNextLength(in, pos);
- heartbeatInterval = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the sslEncryption setting
- */
- length = getNextLength(in, pos);
- sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- // Read the ServerState
- // Caution: ServerState MUST be the last field. Because ServerState can
- // contain null character (string termination of sererid string ..) it
- // cannot be decoded using getNextLength() like the other fields. The
- // only way is to rely on the end of the input buffer : and that forces
- // the ServerState to be the last. This should be changed and we want to
- // have more than one ServerState field.
- serverState = new ServerState(in, pos, in.length - 1);
-
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
+ serverURL = scanner.nextString();
+ maxReceiveDelay = scanner.nextIntUTF8();
+ maxReceiveQueue = scanner.nextIntUTF8();
+ maxSendDelay = scanner.nextIntUTF8();
+ maxSendQueue = scanner.nextIntUTF8();
+ windowSize = scanner.nextIntUTF8();
+ heartbeatInterval = scanner.nextIntUTF8();
+ // FIXME awful encoding
+ sslEncryption = Boolean.valueOf(scanner.nextString());
+ serverState = scanner.nextServerState();
}
/**
@@ -244,69 +177,24 @@
return serverState;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short sessionProtocolVersion)
{
- try {
- byte[] byteServerUrl = serverURL.getBytes("UTF-8");
- byte[] byteMaxRecvDelay =
- String.valueOf(maxReceiveDelay).getBytes("UTF-8");
- byte[] byteMaxRecvQueue =
- String.valueOf(maxReceiveQueue).getBytes("UTF-8");
- byte[] byteMaxSendDelay =
- String.valueOf(maxSendDelay).getBytes("UTF-8");
- byte[] byteMaxSendQueue =
- String.valueOf(maxSendQueue).getBytes("UTF-8");
- byte[] byteWindowSize =
- String.valueOf(windowSize).getBytes("UTF-8");
- byte[] byteHeartbeatInterval =
- String.valueOf(heartbeatInterval).getBytes("UTF-8");
- byte[] byteSSLEncryption =
- String.valueOf(sslEncryption).getBytes("UTF-8");
- byte[] byteServerState = serverState.getBytes();
-
- int length = byteServerUrl.length + 1 +
- byteMaxRecvDelay.length + 1 +
- byteMaxRecvQueue.length + 1 +
- byteMaxSendDelay.length + 1 +
- byteMaxSendQueue.length + 1 +
- byteWindowSize.length + 1 +
- byteHeartbeatInterval.length + 1 +
- byteSSLEncryption.length + 1 +
- byteServerState.length + 1;
-
- /* encode the header in a byte[] large enough to also contain the mods */
- byte resultByteArray[] = encodeHeader(MSG_TYPE_START_ECL, length,
- sessionProtocolVersion);
- int pos = headerLength;
-
- pos = addByteArray(byteServerUrl, resultByteArray, pos);
-
- pos = addByteArray(byteMaxRecvDelay, resultByteArray, pos);
-
- pos = addByteArray(byteMaxRecvQueue, resultByteArray, pos);
-
- pos = addByteArray(byteMaxSendDelay, resultByteArray, pos);
-
- pos = addByteArray(byteMaxSendQueue, resultByteArray, pos);
-
- pos = addByteArray(byteWindowSize, resultByteArray, pos);
-
- pos = addByteArray(byteHeartbeatInterval, resultByteArray, pos);
-
- pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
-
- pos = addByteArray(byteServerState, resultByteArray, pos);
-
- return resultByteArray;
- }
- catch (UnsupportedEncodingException e)
- {
- return null;
- }
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ encodeHeader(MSG_TYPE_START_ECL, builder, sessionProtocolVersion);
+ builder.append(serverURL);
+ builder.appendUTF8(maxReceiveDelay);
+ builder.appendUTF8(maxReceiveQueue);
+ builder.appendUTF8(maxSendDelay);
+ builder.appendUTF8(maxSendQueue);
+ builder.appendUTF8(windowSize);
+ builder.appendUTF8(heartbeatInterval);
+ // FIXME awful encoding
+ builder.append(Boolean.toString(sslEncryption));
+ // Caution: ServerState MUST be the last field.
+ builder.append(serverState);
+ return builder.toByteArray();
}
/**
@@ -343,13 +231,11 @@
return sslEncryption;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
- return this.getClass().getCanonicalName() + " content: " +
+ return getClass().getCanonicalName() + " content: " +
"\nprotocolVersion: " + protocolVersion +
"\ngenerationId: " + generationId +
"\ngroupId: " + groupId +
diff --git a/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java b/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
index 65a319f..f6870dd 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -22,16 +22,14 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions Copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
/**
* This message is used by LDAP server when they first connect.
@@ -40,27 +38,28 @@
*/
public class ServerStartMsg extends StartMsg
{
- private int serverId; // Id of the LDAP server that sent this message
- private String serverURL;
- private DN baseDN;
- private int maxReceiveQueue;
- private int maxSendQueue;
- private int maxReceiveDelay;
- private int maxSendDelay;
- private int windowSize;
- private ServerState serverState = null;
+ /** Id of the LDAP server that sent this message */
+ private final int serverId;
+ private final String serverURL;
+ private final DN baseDN;
+ private final int maxReceiveQueue;
+ private final int maxSendQueue;
+ private final int maxReceiveDelay;
+ private final int maxSendDelay;
+ private final int windowSize;
+ private final ServerState serverState;
/**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
*/
- private long heartbeatInterval = 0;
+ private final long heartbeatInterval;
/**
* Whether to continue using SSL to encrypt messages after the start
* messages have been exchanged.
*/
- private boolean sslEncryption;
+ private final boolean sslEncryption;
/**
* Creates a new ServerStartMsg. This message is to be sent by an LDAP
@@ -108,107 +107,22 @@
* @throws DataFormatException If the byte array does not contain a valid
* encoded form of the ServerStartMsg.
*/
- public ServerStartMsg(byte[] in) throws DataFormatException
+ ServerStartMsg(byte[] in) throws DataFormatException
{
- byte[] allowedPduTypes = new byte[1];
- allowedPduTypes[0] = MSG_TYPE_SERVER_START;
- headerLength = decodeHeader(allowedPduTypes, in);
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ decodeHeader(scanner, MSG_TYPE_SERVER_START);
- try
- {
- /* first bytes are the header */
- int pos = headerLength;
-
- /*
- * read the dn
- * first calculate the length then construct the string
- */
- int length = getNextLength(in, pos);
- baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the ServerId
- */
- length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- serverId = Integer.valueOf(serverIdString);
- pos += length +1;
-
- /*
- * read the ServerURL
- */
- length = getNextLength(in, pos);
- serverURL = new String(in, pos, length, "UTF-8");
- pos += length +1;
-
- /*
- * read the maxReceiveDelay
- */
- length = getNextLength(in, pos);
- maxReceiveDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the maxReceiveQueue
- */
- length = getNextLength(in, pos);
- maxReceiveQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the maxSendDelay
- */
- length = getNextLength(in, pos);
- maxSendDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the maxSendQueue
- */
- length = getNextLength(in, pos);
- maxSendQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the windowSize
- */
- length = getNextLength(in, pos);
- windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the heartbeatInterval
- */
- length = getNextLength(in, pos);
- heartbeatInterval = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- /*
- * read the sslEncryption setting
- */
- length = getNextLength(in, pos);
- sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- // Read the ServerState
- // Caution: ServerState MUST be the last field. Because ServerState can
- // contain null character (string termination of sererid string ..) it
- // cannot be decoded using getNextLength() like the other fields. The
- // only way is to rely on the end of the input buffer : and that forces
- // the ServerState to be the last. This should be changed and we want to
- // have more than one ServerState field.
- serverState = new ServerState(in, pos, in.length - 1);
-
- }
- catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- catch (DirectoryException e)
- {
- throw new DataFormatException(e.getLocalizedMessage());
- }
+ baseDN = scanner.nextDN();
+ serverId = scanner.nextIntUTF8();
+ serverURL = scanner.nextString();
+ maxReceiveDelay = scanner.nextIntUTF8();
+ maxReceiveQueue = scanner.nextIntUTF8();
+ maxSendDelay = scanner.nextIntUTF8();
+ maxSendQueue = scanner.nextIntUTF8();
+ windowSize = scanner.nextIntUTF8();
+ heartbeatInterval = scanner.nextIntUTF8();
+ sslEncryption = Boolean.valueOf(scanner.nextString());
+ serverState = scanner.nextServerState();
}
/**
@@ -284,76 +198,26 @@
return serverState;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
- public byte[] getBytes(short sessionProtocolVersion)
+ public byte[] getBytes(short protocolVersion)
{
- try {
- byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
- byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
- byte[] byteServerUrl = serverURL.getBytes("UTF-8");
- byte[] byteMaxRecvDelay =
- String.valueOf(maxReceiveDelay).getBytes("UTF-8");
- byte[] byteMaxRecvQueue =
- String.valueOf(maxReceiveQueue).getBytes("UTF-8");
- byte[] byteMaxSendDelay =
- String.valueOf(maxSendDelay).getBytes("UTF-8");
- byte[] byteMaxSendQueue =
- String.valueOf(maxSendQueue).getBytes("UTF-8");
- byte[] byteWindowSize =
- String.valueOf(windowSize).getBytes("UTF-8");
- byte[] byteHeartbeatInterval =
- String.valueOf(heartbeatInterval).getBytes("UTF-8");
- byte[] byteSSLEncryption =
- String.valueOf(sslEncryption).getBytes("UTF-8");
- byte[] byteServerState = serverState.getBytes();
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ encodeHeader(MSG_TYPE_SERVER_START, builder, protocolVersion);
- int length = byteDn.length + 1 + byteServerId.length + 1 +
- byteServerUrl.length + 1 +
- byteMaxRecvDelay.length + 1 +
- byteMaxRecvQueue.length + 1 +
- byteMaxSendDelay.length + 1 +
- byteMaxSendQueue.length + 1 +
- byteWindowSize.length + 1 +
- byteHeartbeatInterval.length + 1 +
- byteSSLEncryption.length + 1 +
- byteServerState.length + 1;
-
- /* encode the header in a byte[] large enough to also contain the mods */
- byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length,
- sessionProtocolVersion);
- int pos = headerLength;
-
- pos = addByteArray(byteDn, resultByteArray, pos);
-
- pos = addByteArray(byteServerId, resultByteArray, pos);
-
- pos = addByteArray(byteServerUrl, resultByteArray, pos);
-
- pos = addByteArray(byteMaxRecvDelay, resultByteArray, pos);
-
- pos = addByteArray(byteMaxRecvQueue, resultByteArray, pos);
-
- pos = addByteArray(byteMaxSendDelay, resultByteArray, pos);
-
- pos = addByteArray(byteMaxSendQueue, resultByteArray, pos);
-
- pos = addByteArray(byteWindowSize, resultByteArray, pos);
-
- pos = addByteArray(byteHeartbeatInterval, resultByteArray, pos);
-
- pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
-
- pos = addByteArray(byteServerState, resultByteArray, pos);
-
- return resultByteArray;
- }
- catch (UnsupportedEncodingException e)
- {
- return null;
- }
+ builder.append(baseDN);
+ builder.appendUTF8(serverId);
+ builder.append(serverURL);
+ builder.appendUTF8(maxReceiveDelay);
+ builder.appendUTF8(maxReceiveQueue);
+ builder.appendUTF8(maxSendDelay);
+ builder.appendUTF8(maxSendQueue);
+ builder.appendUTF8(windowSize);
+ builder.appendUTF8(heartbeatInterval);
+ builder.append(Boolean.toString(sslEncryption));
+ // Caution: ServerState MUST be the last field.
+ builder.append(serverState);
+ return builder.toByteArray();
}
/**
@@ -390,9 +254,7 @@
return sslEncryption;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
diff --git a/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java b/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
index d23a0f1..694c2f5 100644
--- a/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
@@ -22,12 +22,10 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -150,7 +148,7 @@
* @throws java.util.zip.DataFormatException If the byte array does not
* contain a valid encoded form of the message.
*/
- public StartECLSessionMsg(byte[] in) throws DataFormatException
+ StartECLSessionMsg(byte[] in) throws DataFormatException
{
/*
* The message is stored in the form:
@@ -158,68 +156,25 @@
* <list of referrals urls>
* (each referral url terminates with 0)
*/
-
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_START_ECL_SESSION)
{
- // first bytes are the header
- int pos = 0;
+ throw new DataFormatException("Input is not a valid "
+ + getClass().getCanonicalName());
+ }
- // first byte is the type
- if (in.length < 1 || in[pos++] != MSG_TYPE_START_ECL_SESSION)
- {
- throw new DataFormatException(
- "Input is not a valid " + this.getClass().getCanonicalName());
- }
-
- // start mode
- int length = getNextLength(in, pos);
- int requestType = Integer.parseInt(new String(in, pos, length, "UTF-8"));
- eclRequestType = ECLRequestType.values()[requestType];
- pos += length +1;
-
- length = getNextLength(in, pos);
- firstChangeNumber = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- length = getNextLength(in, pos);
- lastChangeNumber = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length +1;
-
- length = getNextLength(in, pos);
- csn = new CSN(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
-
- // persistentSearch mode
- length = getNextLength(in, pos);
- int persistent = Integer.parseInt(new String(in, pos, length, "UTF-8"));
- isPersistent = Persistent.values()[persistent];
- pos += length + 1;
-
- // generalized state
- length = getNextLength(in, pos);
- crossDomainServerState = new String(in, pos, length, "UTF-8");
- pos += length + 1;
-
- length = getNextLength(in, pos);
- operationId = new String(in, pos, length, "UTF-8");
- pos += length + 1;
-
- // excluded DN
- length = getNextLength(in, pos);
- String excludedDNsString = new String(in, pos, length, "UTF-8");
- if (excludedDNsString.length()>0)
- {
- String[] excludedDNsStr = excludedDNsString.split(";");
- Collections.addAll(this.excludedBaseDNs, excludedDNsStr);
- }
- pos += length + 1;
-
- } catch (UnsupportedEncodingException e)
+ eclRequestType = ECLRequestType.values()[scanner.nextIntUTF8()];
+ firstChangeNumber = scanner.nextIntUTF8();
+ lastChangeNumber = scanner.nextIntUTF8();
+ csn = scanner.nextCSNUTF8();
+ isPersistent = Persistent.values()[scanner.nextIntUTF8()];
+ crossDomainServerState = scanner.nextString();
+ operationId = scanner.nextString();
+ final String excludedDNsString = scanner.nextString();
+ if (excludedDNsString.length() > 0)
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- } catch (IllegalArgumentException e)
- {
- throw new DataFormatException(e.getMessage());
+ Collections.addAll(excludedBaseDNs, excludedDNsString.split(";"));
}
}
@@ -238,71 +193,26 @@
excludedBaseDNs = new HashSet<String>();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
- String excludedBaseDNsString =
- StaticUtils.collectionToString(excludedBaseDNs, ";");
-
- try
- {
- byte[] byteMode = toBytes(eclRequestType.ordinal());
- // FIXME JNR Changing the lines below to use long would require a protocol
- // version change. Leave it like this for now until the need arises.
- byte[] byteChangeNumber = toBytes((int) firstChangeNumber);
- byte[] byteStopChangeNumber = toBytes((int) lastChangeNumber);
- byte[] byteCSN = csn.toString().getBytes("UTF-8");
- byte[] bytePsearch = toBytes(isPersistent.ordinal());
- byte[] byteGeneralizedState = toBytes(crossDomainServerState);
- byte[] byteOperationId = toBytes(operationId);
- byte[] byteExcludedDNs = toBytes(excludedBaseDNsString);
-
- int length =
- byteMode.length + 1 +
- byteChangeNumber.length + 1 +
- byteStopChangeNumber.length + 1 +
- byteCSN.length + 1 +
- bytePsearch.length + 1 +
- byteGeneralizedState.length + 1 +
- byteOperationId.length + 1 +
- byteExcludedDNs.length + 1 +
- 1;
-
- byte[] resultByteArray = new byte[length];
- int pos = 0;
- resultByteArray[pos++] = MSG_TYPE_START_ECL_SESSION;
- pos = addByteArray(byteMode, resultByteArray, pos);
- pos = addByteArray(byteChangeNumber, resultByteArray, pos);
- pos = addByteArray(byteStopChangeNumber, resultByteArray, pos);
- pos = addByteArray(byteCSN, resultByteArray, pos);
- pos = addByteArray(bytePsearch, resultByteArray, pos);
- pos = addByteArray(byteGeneralizedState, resultByteArray, pos);
- pos = addByteArray(byteOperationId, resultByteArray, pos);
- pos = addByteArray(byteExcludedDNs, resultByteArray, pos);
- return resultByteArray;
- } catch (IOException e)
- {
- // never happens
- return null;
- }
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_START_ECL_SESSION);
+ builder.appendUTF8(eclRequestType.ordinal());
+ // FIXME JNR Changing the lines below to use long would require a protocol
+ // version change. Leave it like this for now until the need arises.
+ builder.appendUTF8((int) firstChangeNumber);
+ builder.appendUTF8((int) lastChangeNumber);
+ builder.appendUTF8(csn);
+ builder.appendUTF8(isPersistent.ordinal());
+ builder.append(crossDomainServerState);
+ builder.append(operationId);
+ builder.append(StaticUtils.collectionToString(excludedBaseDNs, ";"));
+ return builder.toByteArray();
}
- private byte[] toBytes(int i) throws UnsupportedEncodingException
- {
- return toBytes(String.valueOf(i));
- }
-
- private byte[] toBytes(String s) throws UnsupportedEncodingException
- {
- return String.valueOf(s).getBytes("UTF-8");
- }
-
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
diff --git a/opends/src/server/org/opends/server/replication/protocol/StartMsg.java b/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
index 26df8d7..9cc0aad 100644
--- a/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
@@ -22,14 +22,12 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
-
/**
* This abstract message class is the superclass for start messages used
* by LDAP servers and Replication servers to initiate their communications.
@@ -43,12 +41,7 @@
/** Generation id of data set we want to work with. */
protected long generationId;
/** Group id of the replicated domain. */
- protected byte groupId = (byte)-1;
-
- /**
- * The length of the header of this message.
- */
- protected int headerLength;
+ protected byte groupId = -1;
/**
* Create a new StartMsg.
@@ -66,7 +59,7 @@
* @param generationId The generationId for this server.
*
*/
- public StartMsg(short protocolVersion, long generationId)
+ StartMsg(short protocolVersion, long generationId)
{
this.protocolVersion = protocolVersion;
this.generationId = generationId;
@@ -75,196 +68,105 @@
/**
* Encode the header for the start message.
*
- * @param type The type of the message to create.
- * @param additionalLength Additional length needed to encode the remaining
+ * @param msgType The type of the message to create.
+ * @param builder Additional length needed to encode the remaining
* part of the UpdateMessage.
- * @param sessionProtocolVersion The version to use when encoding the header.
- * @return a byte array containing the common header and enough space to
- * encode the remaining bytes of the UpdateMessage as was specified
- * by the additionalLength.
- * (byte array length = common header length + additionalLength)
- * @throws UnsupportedEncodingException if UTF-8 is not supported.
+ * @param protocolVersion The version to use when encoding the header.
*/
- public byte[] encodeHeader(
- byte type, int additionalLength,
- short sessionProtocolVersion)
- throws UnsupportedEncodingException
+ void encodeHeader(byte msgType, ByteArrayBuilder builder, short protocolVersion)
{
-
- byte[] byteGenerationID =
- String.valueOf(generationId).getBytes("UTF-8");
-
/* The message header is stored in the form :
* <message type><protocol version><generation id><group id>
*/
- int length = 1 + 1 + byteGenerationID.length + 1 + 1 +
- additionalLength;
-
- byte[] encodedMsg = new byte[length];
-
- /* put the type of the operation */
- encodedMsg[0] = type;
-
- /* put the protocol version */
- encodedMsg[1] = (byte)sessionProtocolVersion;
-
- /* put the generationId */
- int pos = 2;
- pos = addByteArray(byteGenerationID, encodedMsg, pos);
-
- /* put the group id */
- encodedMsg[pos] = groupId;
-
- pos++;
- headerLength = pos;
-
- return encodedMsg;
+ builder.append(msgType);
+ builder.append((byte) protocolVersion);
+ builder.appendUTF8(generationId);
+ builder.append(groupId);
}
/**
* Encode the header for the start message. This uses the version 1 of the
* replication protocol (used for compatibility purpose).
*
- * @param type The type of the message to create.
- * @param additionalLength additional length needed to encode the remaining
- * part of the UpdateMessage.
- * @return a byte array containing the common header and enough space to
- * encode the remaining bytes of the UpdateMessage as was specified
- * by the additionalLength.
- * (byte array length = common header length + additionalLength)
- * @throws UnsupportedEncodingException if UTF-8 is not supported.
+ * @param msgType The type of the message to create.
+ * @param builder The builder where to append the remaining part of the
+ * UpdateMessage.
*/
- public byte[] encodeHeader_V1(byte type, int additionalLength)
- throws UnsupportedEncodingException
+ void encodeHeader_V1(byte msgType, ByteArrayBuilder builder)
{
- byte[] byteGenerationID =
- String.valueOf(generationId).getBytes("UTF-8");
-
/* The message header is stored in the form :
* <message type><protocol version><generation id>
*/
- int length = 1 + 1 + 1 +
- byteGenerationID.length + 1 +
- additionalLength;
-
- byte[] encodedMsg = new byte[length];
-
- /* put the type of the operation */
- encodedMsg[0] = type;
-
- /* put the protocol version */
- encodedMsg[1] = (byte)ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL;
- encodedMsg[2] = (byte)0;
-
- /* put the generationId */
- int pos = 3;
- headerLength = addByteArray(byteGenerationID, encodedMsg, pos);
-
- return encodedMsg;
+ builder.append(msgType);
+ builder.append((byte) ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL);
+ builder.append((byte) 0);
+ builder.appendUTF8(generationId);
}
/**
* Decode the Header part of this message, and check its type.
*
- * @param types The allowed types of this message.
- * @param encodedMsg the encoded form of the message.
- * @return the position at which the remaining part of the message starts.
+ * @param scanner where to read the message from.
+ * @param allowedTypes The allowed types of this message.
* @throws DataFormatException if the encodedMsg does not contain a valid
* common header.
*/
- public int decodeHeader(byte[] types, byte [] encodedMsg)
- throws DataFormatException
+ void decodeHeader(final ByteArrayScanner scanner, byte... allowedTypes)
+ throws DataFormatException
{
- /* first byte is the type */
- boolean foundMatchingType = false;
- for (byte type : types) {
- if (type == encodedMsg[0]) {
- foundMatchingType = true;
- break;
- }
+ final byte msgType = scanner.nextByte();
+ if (!isTypeAllowed(allowedTypes, msgType))
+ {
+ throw new DataFormatException("byte[] is not a valid start msg: "
+ + msgType);
}
- if (!foundMatchingType)
- throw new DataFormatException("byte[] is not a valid start msg: " +
- encodedMsg[0]);
+
+ final byte version = scanner.nextByte();
// Filter for supported old versions PDUs
- if (encodedMsg[0] == MSG_TYPE_REPL_SERVER_START_V1)
- return decodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1, encodedMsg);
-
- try
+ if (msgType == MSG_TYPE_REPL_SERVER_START_V1)
{
- /* then read the version */
- short readVersion = (short)encodedMsg[1];
- if (readVersion < ProtocolVersion.REPLICATION_PROTOCOL_V2)
- throw new DataFormatException("Not a valid message: type is " +
- encodedMsg[0] + " but protocol version byte is " + readVersion +
- " instead of " + ProtocolVersion.getCurrentVersion());
- protocolVersion = readVersion;
+ if (version != ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL)
+ {
+ throw new DataFormatException("Not a valid message: type is " + msgType
+ + " but protocol version byte is " + version + " instead of "
+ + ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL);
+ }
- /* read the generationId */
- int pos = 2;
- int length = getNextLength(encodedMsg, pos);
- generationId = Long.valueOf(new String(encodedMsg, pos, length,
- "UTF-8"));
- pos += length +1;
+ // Force version to V1
+ // We need to translate the MSG_TYPE_REPL_SERVER_START_V1 version
+ // into REPLICATION_PROTOCOL_V1 so that we only see V1 everywhere.
+ protocolVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
- /* read the group id */
- groupId = encodedMsg[pos];
- pos++;
-
- return pos;
- } catch (UnsupportedEncodingException e)
+ // In V1, version was 1 (49) in string, so with a null
+ // terminating string. Let's position the cursor at the next byte
+ scanner.skipZeroSeparator();
+ generationId = scanner.nextLongUTF8();
+ }
+ else
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ if (version < ProtocolVersion.REPLICATION_PROTOCOL_V2)
+ {
+ throw new DataFormatException("Not a valid message: type is " + msgType
+ + " but protocol version byte is " + version + " instead of "
+ + ProtocolVersion.getCurrentVersion());
+ }
+ protocolVersion = version;
+ generationId = scanner.nextLongUTF8();
+ groupId = scanner.nextByte();
}
}
- /**
- * Decode the Header part of this message, and check its type. This uses the
- * version 1 of the replication protocol (used for compatibility purpose).
- *
- * @param type The type of this message.
- * @param encodedMsg the encoded form of the message.
- * @return the position at which the remaining part of the message starts.
- * @throws DataFormatException if the encodedMsg does not contain a valid
- * common header.
- */
- public int decodeHeader_V1(byte type, byte [] encodedMsg)
- throws DataFormatException
+ private boolean isTypeAllowed(byte[] allowedTypes, final byte msgType)
{
- if (encodedMsg[0] != type)
- throw new DataFormatException("byte[] is not a valid start msg: expected "
- + " a V1 PDU, received: " + encodedMsg[0]);
-
- if (encodedMsg[1] != ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL)
+ for (byte allowedType : allowedTypes)
{
- throw new DataFormatException("Not a valid message: type is " +
- type + " but protocol version byte is " + encodedMsg[1] + " instead of "
- + ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL);
+ if (msgType == allowedType)
+ {
+ return true;
+ }
}
-
- // Force version to V1
- // We need to translate the MSG_TYPE_REPL_SERVER_START_V1 version
- // into REPLICATION_PROTOCOL_V1 so that we only see V1 everywhere.
- protocolVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
-
- try
- {
- // In V1, version was 1 (49) in string, so with a null
- // terminating string. Let's position the cursor at the next byte
- int pos = 3;
-
- /* read the generationId */
- int length = getNextLength(encodedMsg, pos);
- generationId = Long.valueOf(new String(encodedMsg, pos, length,
- "UTF-8"));
- pos += length +1;
-
- return pos;
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
+ return false;
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java b/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
index 87913ce..7d0370c 100644
--- a/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -26,9 +26,6 @@
*/
package org.opends.server.replication.protocol;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.zip.DataFormatException;
@@ -80,7 +77,7 @@
* @throws java.util.zip.DataFormatException If the byte array does not
* contain a valid encoded form of the message.
*/
- public StartSessionMsg(byte[] in, short version) throws DataFormatException
+ StartSessionMsg(byte[] in, short version) throws DataFormatException
{
if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
{
@@ -110,35 +107,21 @@
this.safeDataLevel = safeDataLevel;
}
- /**
- * Creates a new message with the given required parameters.
- * Assured mode is false.
- * @param status Status we are starting with
- * @param referralsURLs Referrals URLs to be used by peer DSs
- */
- public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs)
- {
- this.referralsURLs.addAll(referralsURLs);
- this.status = status;
- this.assuredFlag = false;
- }
-
// ============
// Msg encoding
// ============
/** {@inheritDoc} */
@Override
- public byte[] getBytes(short reqProtocolVersion)
- throws UnsupportedEncodingException
+ public byte[] getBytes(short protocolVersion)
{
- if (reqProtocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
+ if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
{
return getBytes_V23();
}
else
{
- return getBytes_V45(reqProtocolVersion);
+ return getBytes_V45(protocolVersion);
}
}
@@ -157,7 +140,9 @@
writer.writeStartSequence();
for (String url : referralsURLs)
+ {
writer.writeOctetString(url);
+ }
writer.writeEndSequence();
writer.writeStartSequence();
@@ -193,57 +178,37 @@
* <list of referrals urls>
* (each referral url terminates with 0)
*/
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_START_SESSION);
+ builder.append(status.getValue());
+ builder.append(assuredFlag);
+ builder.append(assuredMode.getValue());
+ builder.append(safeDataLevel);
- try
+ if (referralsURLs.size() >= 1)
{
- ByteArrayOutputStream oStream = new ByteArrayOutputStream();
-
- /* Put the message type */
- oStream.write(MSG_TYPE_START_SESSION);
-
- // Put the status
- oStream.write(status.getValue());
-
- // Put the assured flag
- oStream.write(assuredFlag ? (byte) 1 : (byte) 0);
-
- // Put assured mode
- oStream.write(assuredMode.getValue());
-
- // Put safe data level
- oStream.write(safeDataLevel);
-
- // Put the referrals URLs
- if (referralsURLs.size() >= 1)
+ for (String url : referralsURLs)
{
- for (String url : referralsURLs)
- {
- byte[] byteArrayURL = url.getBytes("UTF-8");
- oStream.write(byteArrayURL);
- oStream.write(0);
- }
+ builder.append(url);
}
- return oStream.toByteArray();
- } catch (IOException e)
- {
- // never happens
- return null;
}
+ return builder.toByteArray();
}
// ============
// Msg decoding
// ============
- private void decode_V45(byte[] in, short version)
- throws DataFormatException
+ private void decode_V45(byte[] in, short version) throws DataFormatException
{
ByteSequenceReader reader = ByteString.wrap(in).asReader();
try
{
if (reader.get() != MSG_TYPE_START_SESSION)
- throw new DataFormatException("input is not a valid " +
- this.getClass().getCanonicalName());
+ {
+ throw new DataFormatException("input is not a valid "
+ + getClass().getCanonicalName());
+ }
/*
status = ServerStatus.valueOf(asn1Reader.readOctetString().byteAt(0));
@@ -279,8 +244,7 @@
asn1Reader.readStartSequence();
while (asn1Reader.hasNextElement())
{
- String s = asn1Reader.readOctetStringAsString();
- this.eclIncludesForDeletes.add(s);
+ this.eclIncludesForDeletes.add(asn1Reader.readOctetStringAsString());
}
asn1Reader.readEndSequence();
}
@@ -296,8 +260,7 @@
}
}
- private void decode_V23(byte[] in)
- throws DataFormatException
+ private void decode_V23(byte[] in) throws DataFormatException
{
/*
* The message is stored in the form:
@@ -305,46 +268,22 @@
* <list of referrals urls>
* (each referral url terminates with 0)
*/
-
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_START_SESSION)
{
- /* first byte is the type */
- if (in.length < 1 || in[0] != MSG_TYPE_START_SESSION)
- {
- throw new DataFormatException(
- "Input is not a valid " + this.getClass().getCanonicalName());
- }
+ throw new DataFormatException(
+ "Input is not a valid " + getClass().getCanonicalName());
+ }
- /* Read the status */
- status = ServerStatus.valueOf(in[1]);
+ status = ServerStatus.valueOf(scanner.nextByte());
+ assuredFlag = scanner.nextBoolean();
+ assuredMode = AssuredMode.valueOf(scanner.nextByte());
+ safeDataLevel = scanner.nextByte();
- /* Read the assured flag */
- assuredFlag = in[2] == 1;
-
- /* Read the assured mode */
- assuredMode = AssuredMode.valueOf(in[3]);
-
- /* Read the safe data level */
- safeDataLevel = in[4];
-
- /* Read the referrals URLs */
- int pos = 5;
- while (pos < in.length)
- {
- /*
- * Read the next URL
- * first calculate the length then construct the string
- */
- int length = getNextLength(in, pos);
- referralsURLs.add(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
- }
- } catch (UnsupportedEncodingException e)
+ while (!scanner.isEmpty())
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- } catch (IllegalArgumentException e)
- {
- throw new DataFormatException(e.getMessage());
+ referralsURLs.add(scanner.nextString());
}
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index 61c793f..56a1bbf 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -26,9 +26,6 @@
*/
package org.opends.server.replication.protocol;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.zip.DataFormatException;
@@ -37,6 +34,8 @@
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
+
/**
* This class defines a message that is sent:
* - By a RS to the other RSs in the topology, containing:
@@ -68,173 +67,102 @@
* @throws java.util.zip.DataFormatException If the byte array does not
* contain a valid encoded form of the message.
*/
- public TopologyMsg(byte[] in, short version) throws DataFormatException
+ TopologyMsg(byte[] in, short version) throws DataFormatException
{
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_TOPOLOGY)
{
- /* First byte is the type */
- if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
- {
- throw new DataFormatException(
- "Input is not a valid " + getClass().getCanonicalName());
- }
-
- int pos = 1;
-
- /* Read number of following DS info entries */
- byte nDsInfo = in[pos++];
-
- /* Read the DS info entries */
- Map<Integer, DSInfo> replicaInfos =
- new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
- while (nDsInfo > 0 && pos < in.length)
- {
- /* Read DS id */
- int length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- int dsId = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read DS URL */
- String dsUrl;
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
- {
- length = getNextLength(in, pos);
- dsUrl = new String(in, pos, length, "UTF-8");
- pos += length + 1;
- }
- else
- {
- dsUrl = "";
- }
-
- /* Read RS id */
- length = getNextLength(in, pos);
- serverIdString = new String(in, pos, length, "UTF-8");
- int rsId = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read the generation id */
- length = getNextLength(in, pos);
- long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
-
- /* Read DS status */
- ServerStatus status = ServerStatus.valueOf(in[pos++]);
-
- /* Read DS assured flag */
- boolean assuredFlag = in[pos++] == 1;
-
- /* Read DS assured mode */
- AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
-
- /* Read DS safe data level */
- byte safeDataLevel = in[pos++];
-
- /* Read DS group id */
- byte groupId = in[pos++];
-
- /* Read number of referrals URLs */
- List<String> refUrls = new ArrayList<String>();
- pos = readStrings(in, pos, refUrls);
-
- Set<String> attrs = new HashSet<String>();
- Set<String> delattrs = new HashSet<String>();
- short protocolVersion = -1;
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- pos = readStrings(in, pos, attrs);
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
- {
- pos = readStrings(in, pos, delattrs);
- }
- else
- {
- // Default to using the same set of attributes for deletes.
- delattrs.addAll(attrs);
- }
-
- /* Read Protocol version */
- protocolVersion = in[pos++];
- }
-
- /* Now create DSInfo and store it */
- replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
- status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
- attrs, delattrs, protocolVersion));
-
- nDsInfo--;
- }
-
- /* Read number of following RS info entries */
- byte nRsInfo = in[pos++];
-
- /* Read the RS info entries */
- List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
- while (nRsInfo > 0 && pos < in.length)
- {
- /* Read RS id */
- int length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- int id = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read the generation id */
- length = getNextLength(in, pos);
- long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
-
- /* Read RS group id */
- byte groupId = in[pos++];
-
- int weight = 1;
- String serverUrl = null;
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- length = getNextLength(in, pos);
- serverUrl = new String(in, pos, length, "UTF-8");
- pos += length + 1;
-
- /* Read RS weight */
- length = getNextLength(in, pos);
- weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
- }
-
- /* Now create RSInfo and store it */
- rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
-
- nRsInfo--;
- }
-
- this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
- this.rsInfos = Collections.unmodifiableList(rsInfos);
+ throw new DataFormatException("Input is not a valid "
+ + getClass().getCanonicalName());
}
- catch (UnsupportedEncodingException e)
+
+ // Read the DS info entries, first read number of them
+ int nDsInfo = scanner.nextByte();
+ final Map<Integer, DSInfo> replicaInfos =
+ new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
+ while (nDsInfo > 0 && !scanner.isEmpty())
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ final DSInfo dsInfo = nextDSInfo(scanner, version);
+ replicaInfos.put(dsInfo.getDsId(), dsInfo);
+ nDsInfo--;
}
+
+ // Read the RS info entries
+ int nRsInfo = scanner.nextByte();
+ final List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
+ while (nRsInfo > 0 && !scanner.isEmpty())
+ {
+ rsInfos.add(nextRSInfo(scanner, version));
+ nRsInfo--;
+ }
+
+ this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
+ this.rsInfos = Collections.unmodifiableList(rsInfos);
}
- private int readStrings(byte[] in, int pos, Collection<String> outputCol)
- throws DataFormatException, UnsupportedEncodingException
+ private DSInfo nextDSInfo(ByteArrayScanner scanner, short version)
+ throws DataFormatException
{
- byte nAttrs = in[pos++];
- byte nRead = 0;
- // Read all elements until expected number read
- while (nRead != nAttrs && pos < in.length)
+ final int dsId = scanner.nextIntUTF8();
+ final String dsUrl =
+ version < REPLICATION_PROTOCOL_V6 ? "" : scanner.nextString();
+ final int rsId = scanner.nextIntUTF8();
+ final long generationId = scanner.nextLongUTF8();
+ final ServerStatus status = ServerStatus.valueOf(scanner.nextByte());
+ final boolean assuredFlag = scanner.nextBoolean();
+ final AssuredMode assuredMode = AssuredMode.valueOf(scanner.nextByte());
+ final byte safeDataLevel = scanner.nextByte();
+ final byte groupId = scanner.nextByte();
+
+ final List<String> refUrls = new ArrayList<String>();
+ scanner.nextStrings(refUrls);
+
+ final Set<String> attrs = new HashSet<String>();
+ final Set<String> delattrs = new HashSet<String>();
+ short protocolVersion = -1;
+ if (version >= REPLICATION_PROTOCOL_V4)
{
- int length = getNextLength(in, pos);
- outputCol.add(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
- nRead++;
+ scanner.nextStrings(attrs);
+
+ if (version >= REPLICATION_PROTOCOL_V5)
+ {
+ scanner.nextStrings(delattrs);
+ }
+ else
+ {
+ // Default to using the same set of attributes for deletes.
+ delattrs.addAll(attrs);
+ }
+
+ protocolVersion = scanner.nextByte();
}
- return pos;
+
+ return new DSInfo(dsId, dsUrl, rsId, generationId, status, assuredFlag,
+ assuredMode, safeDataLevel, groupId, refUrls, attrs, delattrs,
+ protocolVersion);
+ }
+
+ private RSInfo nextRSInfo(ByteArrayScanner scanner, short version)
+ throws DataFormatException
+ {
+ final int rsId = scanner.nextIntUTF8();
+ final long generationId = scanner.nextLongUTF8();
+ final byte groupId = scanner.nextByte();
+
+ int weight = 1;
+ String serverUrl = null;
+ if (version >= REPLICATION_PROTOCOL_V4)
+ {
+ serverUrl = scanner.nextString();
+ weight = scanner.nextIntUTF8();
+ }
+
+ return new RSInfo(rsId, serverUrl, generationId, groupId, weight);
}
/**
- * Creates a new message of the currently connected servers.
+ * Creates a new message of the currently connected servers.
*
* @param dsInfos The collection of currently connected DS servers ID.
* @param rsInfos The list of currently connected RS servers ID.
@@ -272,122 +200,62 @@
/** {@inheritDoc} */
@Override
- public byte[] getBytes(short version) throws UnsupportedEncodingException
+ public byte[] getBytes(short version)
{
- try
+ /**
+ * Message has the following form:
+ * <pdu type><number of following DSInfo entries>[<DSInfo>]*
+ * <number of following RSInfo entries>[<RSInfo>]*
+ */
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_TOPOLOGY);
+
+ // Put DS infos
+ builder.append((byte) replicaInfos.size());
+ for (DSInfo dsInfo : replicaInfos.values())
{
- /**
- * Message has the following form:
- * <pdu type><number of following DSInfo entries>[<DSInfo>]*
- * <number of following RSInfo entries>[<RSInfo>]*
- */
- ByteArrayOutputStream oStream = new ByteArrayOutputStream();
-
- /* Put the message type */
- oStream.write(MSG_TYPE_TOPOLOGY);
-
- // Put number of following DS info entries
- oStream.write((byte) replicaInfos.size());
-
- // Put DS info
- for (DSInfo dsInfo : replicaInfos.values())
+ builder.appendUTF8(dsInfo.getDsId());
+ if (version >= REPLICATION_PROTOCOL_V6)
{
- // Put DS id
- byte[] byteServerId =
- String.valueOf(dsInfo.getDsId()).getBytes("UTF-8");
- oStream.write(byteServerId);
- oStream.write(0);
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
- {
- // Put DS URL
- oStream.write(dsInfo.getDsUrl().getBytes("UTF-8"));
- oStream.write(0);
- }
- // Put RS id
- byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
- oStream.write(byteServerId);
- oStream.write(0);
- // Put the generation id
- oStream.write(String.valueOf(dsInfo.getGenerationId()).
- getBytes("UTF-8"));
- oStream.write(0);
- // Put DS status
- oStream.write(dsInfo.getStatus().getValue());
- // Put DS assured flag
- oStream.write(dsInfo.isAssured() ? (byte) 1 : (byte) 0);
- // Put DS assured mode
- oStream.write(dsInfo.getAssuredMode().getValue());
- // Put DS safe data level
- oStream.write(dsInfo.getSafeDataLevel());
- // Put DS group id
- oStream.write(dsInfo.getGroupId());
-
- writeStrings(oStream, dsInfo.getRefUrls());
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // Put ECL includes
- writeStrings(oStream, dsInfo.getEclIncludes());
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
- {
- writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
- }
-
- oStream.write(dsInfo.getProtocolVersion());
- }
+ builder.append(dsInfo.getDsUrl());
}
+ builder.appendUTF8(dsInfo.getRsId());
+ builder.appendUTF8(dsInfo.getGenerationId());
+ builder.append(dsInfo.getStatus().getValue());
+ builder.append(dsInfo.isAssured());
+ builder.append(dsInfo.getAssuredMode().getValue());
+ builder.append(dsInfo.getSafeDataLevel());
+ builder.append(dsInfo.getGroupId());
- // Put number of following RS info entries
- oStream.write((byte) rsInfos.size());
+ builder.appendStrings(dsInfo.getRefUrls());
- // Put RS info
- for (RSInfo rsInfo : rsInfos)
+ if (version >= REPLICATION_PROTOCOL_V4)
{
- // Put RS id
- byte[] byteServerId =
- String.valueOf(rsInfo.getId()).getBytes("UTF-8");
- oStream.write(byteServerId);
- oStream.write(0);
- // Put the generation id
- oStream.write(String.valueOf(rsInfo.getGenerationId()).
- getBytes("UTF-8"));
- oStream.write(0);
- // Put RS group id
- oStream.write(rsInfo.getGroupId());
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ builder.appendStrings(dsInfo.getEclIncludes());
+ if (version >= REPLICATION_PROTOCOL_V5)
{
- // Put server URL
- oStream.write(rsInfo.getServerUrl().getBytes("UTF-8"));
- oStream.write(0);
-
- // Put RS weight
- oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
- oStream.write(0);
+ builder.appendStrings(dsInfo.getEclIncludesForDeletes());
}
+ builder.append((byte) dsInfo.getProtocolVersion());
}
+ }
- return oStream.toByteArray();
- }
- catch (IOException e)
+ // Put RS infos
+ builder.append((byte) rsInfos.size());
+ for (RSInfo rsInfo : rsInfos)
{
- // never happens
- throw new RuntimeException(e);
- }
- }
+ builder.appendUTF8(rsInfo.getId());
+ builder.appendUTF8(rsInfo.getGenerationId());
+ builder.append(rsInfo.getGroupId());
- private void writeStrings(ByteArrayOutputStream oStream,
- Collection<String> col) throws IOException, UnsupportedEncodingException
- {
- // Put collection length as a byte
- oStream.write(col.size());
- for (String elem : col)
- {
- // Write the element and a 0 terminating byte
- oStream.write(elem.getBytes("UTF-8"));
- oStream.write(0);
+ if (version >= REPLICATION_PROTOCOL_V4)
+ {
+ builder.append(rsInfo.getServerUrl());
+ builder.appendUTF8(rsInfo.getWeight());
+ }
}
+
+ return builder.toByteArray();
}
/** {@inheritDoc} */
@@ -414,7 +282,7 @@
+ "CONNECTED RS SERVERS:"
+ "\n--------------------\n"
+ rsStr
- + (rsStr.equals("") ? "----------------------------\n" : "");
+ + ("".equals(rsStr) ? "----------------------------\n" : "");
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
index dbcc51a..28febb3 100644
--- a/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -22,16 +22,17 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions Copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.CSN;
+import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
+
/**
* Abstract class that must be extended to define a message
* used for sending Updates between servers.
@@ -39,42 +40,31 @@
public class UpdateMsg extends ReplicationMsg
implements Comparable<UpdateMsg>
{
- /**
- * Protocol version.
- */
+ /** Protocol version. */
protected short protocolVersion;
- /**
- * The CSN of this update.
- */
+ /** The CSN of this update. */
protected CSN csn;
- /**
- * True when the update must use assured replication.
- */
+ /** True when the update must use assured replication. */
protected boolean assuredFlag = false;
- /**
- * When assuredFlag is true, defines the requested assured mode.
- */
+ /** When assuredFlag is true, defines the requested assured mode. */
protected AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
- /**
- * When assured mode is safe data, gives the requested level.
- */
+ /** When assured mode is safe data, gives the requested level. */
protected byte safeDataLevel = (byte)1;
- /**
- * The payload that must be encoded in this message.
- */
- private byte[] payload;
-
+ /** The payload that must be encoded in this message. */
+ private final byte[] payload;
/**
* Creates a new empty UpdateMsg.
*/
protected UpdateMsg()
- {}
+ {
+ payload = null;
+ }
/**
* Creates a new UpdateMsg with the given information.
@@ -85,25 +75,10 @@
*/
UpdateMsg(byte[] bytes) throws DataFormatException
{
- // Decode header
- int pos = decodeHeader(MSG_TYPE_GENERIC_UPDATE, bytes);
-
+ final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
+ decodeHeader(MSG_TYPE_GENERIC_UPDATE, scanner);
// Read the payload : all the remaining bytes but the terminating 0
- int length = bytes.length - pos;
- payload = new byte[length];
- try
- {
- System.arraycopy(bytes, pos, payload, 0, length);
- } catch (IndexOutOfBoundsException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (ArrayStoreException e)
- {
- throw new DataFormatException(e.getMessage());
- } catch (NullPointerException e)
- {
- throw new DataFormatException(e.getMessage());
- }
+ payload = scanner.remainingBytes();
}
/**
@@ -152,9 +127,7 @@
assuredFlag = assured;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public boolean equals(Object obj)
{
@@ -162,18 +135,14 @@
csn.equals(((UpdateMsg) obj).csn);
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public int hashCode()
{
return csn.hashCode();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public int compareTo(UpdateMsg msg)
{
@@ -243,103 +212,50 @@
* Encode the common header for all the UpdateMsg. This uses the current
* protocol version.
*
- * @param type the type of UpdateMsg to encode.
- * @param additionalLength additional length needed to encode the remaining
- * part of the UpdateMsg.
- * @param version The ProtocolVersion to use when encoding.
- * @return a byte array containing the common header and enough space to
- * encode the remaining bytes of the UpdateMsg as was specified
- * by the additionalLength.
- * (byte array length = common header length + additionalLength)
- * @throws UnsupportedEncodingException if UTF-8 is not supported.
+ * @param msgType The type of UpdateMsg to encode.
+ * @param protocolVersion The ProtocolVersion to use when encoding.
+ * @return a byte array builder containing the common header
*/
- protected byte[] encodeHeader(byte type, int additionalLength, short version)
- throws UnsupportedEncodingException
+ protected ByteArrayBuilder encodeHeader(byte msgType, short protocolVersion)
{
- byte[] csnByte = getCSN().toString().getBytes("UTF-8");
-
- /* The message header is stored in the form :
- * <operation type><protocol version><CSN><assured>
- * <assured mode> <safe data level>
- * the length of result byte array is therefore :
- * 1 + 1 + CSN length + 1 + 1
- * + 1 + 1 + additional_length
- */
- int length = 6 + csnByte.length + additionalLength;
-
- byte[] encodedMsg = new byte[length];
-
- // put the type of the operation
- encodedMsg[0] = type;
-
- // put the protocol version
- encodedMsg[1] = (byte)ProtocolVersion.getCurrentVersion();
- int pos = 2;
-
- // Put the CSN
- pos = addByteArray(csnByte, encodedMsg, pos);
-
- // Put the assured flag
- encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
-
- // Put the assured mode
- encodedMsg[pos++] = assuredMode.getValue();
-
- // Put the safe data level
- encodedMsg[pos++] = safeDataLevel;
-
- return encodedMsg;
+ final ByteArrayBuilder builder =
+ new ByteArrayBuilder(bytes(6) + csnsUTF8(1));
+ builder.append(msgType);
+ builder.append((byte) ProtocolVersion.getCurrentVersion());
+ builder.appendUTF8(getCSN());
+ builder.append(assuredFlag);
+ builder.append(assuredMode.getValue());
+ builder.append(safeDataLevel);
+ return builder;
}
/**
* Decode the Header part of this Update Message, and check its type.
*
- * @param type The allowed type of this Update Message.
- * @param encodedMsg the encoded form of the UpdateMsg.
- * @return the position at which the remaining part of the message starts.
- * @throws DataFormatException if the encodedMsg does not contain a valid
- * common header.
+ * @param allowedType The allowed type of this Update Message.
+ * @param scanner The encoded form of the UpdateMsg.
+ * @throws DataFormatException
+ * if the scanner does not contain a valid common header.
*/
- protected int decodeHeader(byte type, byte[] encodedMsg)
- throws DataFormatException
+ protected void decodeHeader(byte allowedType, ByteArrayScanner scanner)
+ throws DataFormatException
{
/* The message header is stored in the form :
* <operation type><protocol version><CSN><assured>
* <assured mode> <safe data level>
*/
- if (!(type == encodedMsg[0]))
+ final byte msgType = scanner.nextByte();
+ if (allowedType != msgType)
+ {
throw new DataFormatException("byte[] is not a valid update msg: "
- + encodedMsg[0]);
-
- // read the protocol version
- protocolVersion = encodedMsg[1];
-
- try
- {
- // Read the CSN
- int pos = 2;
- int length = getNextLength(encodedMsg, pos);
- String csnStr = new String(encodedMsg, pos, length, "UTF-8");
- pos += length + 1;
- csn = new CSN(csnStr);
-
- // Read the assured information
- assuredFlag = encodedMsg[pos++] == 1;
-
- // Read the assured mode
- assuredMode = AssuredMode.valueOf(encodedMsg[pos++]);
-
- // Read the safe data level
- safeDataLevel = encodedMsg[pos++];
-
- return pos;
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- } catch (IllegalArgumentException e)
- {
- throw new DataFormatException(e.getMessage());
+ + msgType);
}
+
+ protocolVersion = scanner.nextByte();
+ csn = scanner.nextCSNUTF8();
+ assuredFlag = scanner.nextBoolean();
+ assuredMode = AssuredMode.valueOf(scanner.nextByte());
+ safeDataLevel = scanner.nextByte();
}
/**
@@ -347,10 +263,8 @@
* protocol version.
*
* @return The encoded representation of this update message.
- * @throws UnsupportedEncodingException
- * If the message could not be encoded.
*/
- public byte[] getBytes() throws UnsupportedEncodingException
+ public byte[] getBytes()
{
return getBytes(ProtocolVersion.getCurrentVersion());
}
@@ -364,20 +278,11 @@
*/
@Override
public byte[] getBytes(short protocolVersion)
- throws UnsupportedEncodingException
{
- // Encode the header in a byte[] large enough to also contain the payload
- byte[] resultByteArray = encodeHeader(MSG_TYPE_GENERIC_UPDATE,
- payload.length, ProtocolVersion.getCurrentVersion());
-
- int pos = resultByteArray.length - payload.length;
-
- // Add the payload
- for (int i = 0; i < payload.length; i++, pos++)
- {
- resultByteArray[pos] = payload[i];
- }
- return resultByteArray;
+ final ByteArrayBuilder builder = encodeHeader(MSG_TYPE_GENERIC_UPDATE,
+ ProtocolVersion.getCurrentVersion());
+ builder.append(payload);
+ return builder.toByteArray();
}
/**
diff --git a/opends/src/server/org/opends/server/replication/protocol/WindowMsg.java b/opends/src/server/org/opends/server/replication/protocol/WindowMsg.java
index e638d45..1bcff24 100644
--- a/opends/src/server/org/opends/server/replication/protocol/WindowMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/WindowMsg.java
@@ -22,14 +22,12 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
-
/**
* This message is used by LDAP server or by Replication Servers to
* update the send window of the remote entities.
@@ -43,7 +41,6 @@
{
private final int numAck;
-
/**
* Create a new WindowMsg.
*
@@ -63,64 +60,28 @@
* @throws DataFormatException If the byte array does not contain a valid
* encoded form of the WindowMsg.
*/
- public WindowMsg(byte[] in) throws DataFormatException
+ WindowMsg(byte[] in) throws DataFormatException
{
- /* The WindowMsg is encoded in the form :
- * <numAck>
- */
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_WINDOW)
{
- /* first byte is the type */
- if (in[0] != MSG_TYPE_WINDOW)
- throw new DataFormatException("input is not a valid Window Message");
- int pos = 1;
-
- /*
- * read the number of acks contained in this message.
- * first calculate the length then construct the string
- */
- int length = getNextLength(in, pos);
- String numAckStr = new String(in, pos, length, "UTF-8");
- pos += length +1;
- numAck = Integer.parseInt(numAckStr);
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ throw new DataFormatException("input is not a valid Window Message");
}
+
+ numAck = scanner.nextIntUTF8();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
- /*
- * WindowMsg contains.
- * <numAck>
- */
- try {
- byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8");
-
- int length = 1 + byteNumAck.length + 1;
-
- byte[] resultByteArray = new byte[length];
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_WINDOW;
- int pos = 1;
-
- pos = addByteArray(byteNumAck, resultByteArray, pos);
-
- return resultByteArray;
- }
- catch (UnsupportedEncodingException e)
- {
- return null;
- }
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_WINDOW);
+ builder.appendUTF8(numAck);
+ return builder.toByteArray();
}
-
/**
* Get the number of message acknowledged by the Window Message.
*
@@ -131,9 +92,7 @@
return numAck;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
diff --git a/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java b/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
index e26e6b9..6c40582 100644
--- a/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS.
+ * Portions Copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.server;
@@ -31,9 +31,7 @@
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.protocol.LDAPUpdateMsg;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.*;
/**
* This is a facility class that is in fact an hack to optimize replication
@@ -48,28 +46,32 @@
* when calling the isAssured() method.
*
*/
-public class NotAssuredUpdateMsg extends UpdateMsg
+class NotAssuredUpdateMsg extends UpdateMsg
{
- // The real update message this message represents
- private UpdateMsg realUpdateMsg = null;
+ /** The real update message this message represents */
+ private final UpdateMsg realUpdateMsg;
- // V1 serialized form of the real message with assured flag set to false.
- // Ready to be sent.
- private byte[] realUpdateMsgNotAssuredBytesV1 = null;
+ /**
+ * V1 serialized form of the real message with assured flag set to false.
+ * Ready to be sent.
+ */
+ private final byte[] realUpdateMsgNotAssuredBytesV1;
- // VLatest serialized form of the real message with assured flag set to false.
- // Ready to be sent.
- private byte[] realUpdateMsgNotAssuredBytesVLatest = null;
+ /**
+ * VLatest serialized form of the real message with assured flag set to false.
+ * Ready to be sent.
+ */
+ private final byte[] realUpdateMsgNotAssuredBytesVLatest;
/**
* Creates a new empty UpdateMsg.
* This class is only used by replication server code so constructor is not
* public by security.
+ *
* @param updateMsg The real underlying update message this object represents.
* @throws UnsupportedEncodingException When the pre-encoding of the message
* failed because the UTF-8 encoding is not supported or the
* requested protocol version to use is not supported by this PDU.
- *
*/
NotAssuredUpdateMsg(UpdateMsg updateMsg) throws UnsupportedEncodingException
{
@@ -85,18 +87,7 @@
* Get the encoding form of the real message then overwrite the assured
* flag to always be false.
*/
- byte[] origBytes = realUpdateMsg.getBytes(
- ProtocolVersion.REPLICATION_PROTOCOL_V1);
- // Clone the byte array to be able to modify it without problems
- // (ModifyMsg messages for instance do not return a cloned version of
- // their byte array)
- byte[] bytes = new byte[origBytes.length];
- System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
-
- int maxLen = bytes.length;
- int pos;
- int nZeroFound = 0; // Number of 0 value found
- boolean found = false;
+ byte[] bytes = getRealUpdateMsgBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
/* Look for assured flag position:
* The message header is stored in the form :
@@ -107,23 +98,7 @@
* See LDAPUpdateMsg.encodeHeader_V1() for more information
*/
// Find end of CSN then end of dn
- for (pos = 1; pos < maxLen; pos++)
- {
- if (bytes[pos] == (byte) 0)
- {
- nZeroFound++;
- if (nZeroFound == 2) // 2 end of string to find
- {
- found = true;
- break;
- }
- }
- }
- if (!found)
- throw new UnsupportedEncodingException("Could not find end of CSN.");
- pos++;
- if (pos >= maxLen)
- throw new UnsupportedEncodingException("Reached end of packet.");
+ int pos = findNthZeroByte(bytes, 1, 2);
// Force assured flag to false
bytes[pos] = 0;
@@ -135,16 +110,7 @@
* Get the encoding form of the real message then overwrite the assured
* flag to always be false.
*/
- origBytes = realUpdateMsg.getBytes(ProtocolVersion.getCurrentVersion());
- // Clone the byte array to be able to modify it without problems
- // (ModifyMsg messages for instance do not return a cloned version of
- // their byte array)
- bytes = new byte[origBytes.length];
- System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
-
- maxLen = bytes.length;
- nZeroFound = 0; // Number of 0 value found
- found = false;
+ bytes = getRealUpdateMsgBytes(ProtocolVersion.getCurrentVersion());
/* Look for assured flag position:
* The message header is stored in the form :
@@ -156,48 +122,22 @@
* See LDAPUpdateMsg.encodeHeader() for more information
*/
// Find end of CSN then end of dn then end of uuid
- for (pos = 2; pos < maxLen; pos++)
- {
- if (bytes[pos] == (byte) 0)
- {
- nZeroFound++;
- if (nZeroFound == 3) // 3 end of string to find
- {
- found = true;
- break;
- }
- }
- }
- if (!found)
- throw new UnsupportedEncodingException("Could not find end of CSN.");
- pos++;
- if (pos >= maxLen)
- throw new UnsupportedEncodingException("Reached end of packet.");
+ pos = findNthZeroByte(bytes, 2, 3);
// Force assured flag to false
bytes[pos] = 0;
// Store computed VLATEST serialized form
realUpdateMsgNotAssuredBytesVLatest = bytes;
-
- } else
+ }
+ else
{
+ realUpdateMsgNotAssuredBytesV1 = null;
/**
* Prepare VLATEST serialized form of the message:
* Get the encoding form of the real message then overwrite the assured
* flag to always be false.
*/
- byte[] origBytes = realUpdateMsg.getBytes(
- ProtocolVersion.getCurrentVersion());
- // Clone the byte array to be able to modify it without problems
- // (ModifyMsg messages for instance do not return a cloned version of
- // their byte array)
- byte[] bytes = new byte[origBytes.length];
- System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
-
- int maxLen = bytes.length;
- int pos;
- int nZeroFound = 0; // Number of 0 value found
- boolean found = false;
+ byte[] bytes = getRealUpdateMsgBytes(ProtocolVersion.getCurrentVersion());
// This is a generic update message
/* Look for assured flag position:
@@ -210,23 +150,7 @@
* See UpdateMsg.encodeHeader() for more information
*/
// Find end of CSN
- for (pos = 2; pos < maxLen; pos++)
- {
- if (bytes[pos] == (byte) 0)
- {
- nZeroFound++;
- if (nZeroFound == 1) // 1 end of string to find
- {
- found = true;
- break;
- }
- }
- }
- if (!found)
- throw new UnsupportedEncodingException("Could not find end of CSN.");
- pos++;
- if (pos >= maxLen)
- throw new UnsupportedEncodingException("Reached end of packet.");
+ int pos = findNthZeroByte(bytes, 2, 1);
// Force assured flag to false
bytes[pos] = 0;
@@ -236,17 +160,52 @@
}
/**
- * {@inheritDoc}
+ * Clones the byte array to be able to modify it without problems
+ * (ModifyMsg messages for instance do not return a cloned version of
+ * their byte array).
*/
+ private byte[] getRealUpdateMsgBytes(final short protocolVersion)
+ {
+ byte[] origBytes = realUpdateMsg.getBytes(protocolVersion);
+ byte[] bytes = new byte[origBytes.length];
+ System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
+ return bytes;
+ }
+
+ private int findNthZeroByte(byte[] bytes, int startPos, int nbToFind)
+ throws UnsupportedEncodingException
+ {
+ final int maxLen = bytes.length;
+ int nbZeroFound = 0; // Number of 0 values found
+ for (int pos = startPos; pos < maxLen; pos++)
+ {
+ if (bytes[pos] == (byte) 0)
+ {
+ nbZeroFound++;
+ if (nbZeroFound == nbToFind)
+ {
+ // nb of end of strings reached
+ pos++;
+ if (pos >= maxLen)
+ {
+ throw new UnsupportedEncodingException("Reached end of packet.");
+ }
+ return pos;
+ }
+ }
+ }
+ throw new UnsupportedEncodingException(
+ "Could not find " + nbToFind + " zero bytes in byte array.");
+ }
+
+ /** {@inheritDoc} */
@Override
public CSN getCSN()
{
return realUpdateMsg.getCSN();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public boolean isAssured()
{
@@ -254,9 +213,7 @@
return false;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public void setAssured(boolean assured)
{
@@ -264,78 +221,59 @@
// and we do not want to change the original real update message settings
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public boolean equals(Object obj)
{
// Compare with the underlying real update message
- if (obj != null)
- {
- if (obj.getClass() != realUpdateMsg.getClass())
- return false;
- return realUpdateMsg.getCSN().
- equals(((UpdateMsg)obj).getCSN());
- }
- else
+ if (obj == null)
{
return false;
}
+ return obj.getClass() == realUpdateMsg.getClass()
+ && realUpdateMsg.getCSN().equals(((UpdateMsg) obj).getCSN());
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public int hashCode()
{
return realUpdateMsg.hashCode();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public int compareTo(UpdateMsg msg)
{
return realUpdateMsg.compareTo(msg);
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
- public byte[] getBytes(short reqProtocolVersion)
- throws UnsupportedEncodingException
+ public byte[] getBytes(short protocolVersion)
{
- if (reqProtocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
+ {
return realUpdateMsgNotAssuredBytesV1;
- else
- return realUpdateMsgNotAssuredBytesVLatest;
+ }
+ return realUpdateMsgNotAssuredBytesVLatest;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public AssuredMode getAssuredMode()
{
return realUpdateMsg.getAssuredMode();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte getSafeDataLevel()
{
return realUpdateMsg.getSafeDataLevel();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public void setAssuredMode(AssuredMode assuredMode)
{
@@ -343,9 +281,7 @@
// and we do not want to change the original real update message settings
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public void setSafeDataLevel(byte safeDataLevel)
{
@@ -353,49 +289,38 @@
// and we do not want to change the original real update message settings
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public short getVersion()
{
return realUpdateMsg.getVersion();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public int size()
{
return realUpdateMsg.size();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
- protected byte[] encodeHeader(byte type, int additionalLength, short version)
- throws UnsupportedEncodingException
+ protected ByteArrayBuilder encodeHeader(byte allowedType,
+ short protocolVersion)
{
// Not called as only used by constructors using bytes
return null;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
- public int decodeHeader(byte type, byte[] encodedMsg)
- throws DataFormatException
+ protected void decodeHeader(byte msgType, ByteArrayScanner scanner)
+ throws DataFormatException
{
// Not called as only used by getBytes methods
- return -1;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getPayload()
{
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
index 0525996..988e65c 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -25,7 +25,6 @@
*/
package org.opends.server.replication.server.changelog.file;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -35,7 +34,6 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
-import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -48,11 +46,9 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
-import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.InitializationException;
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
/**
* Represents a replication server database for one server in the topology.
@@ -370,22 +366,12 @@
/** Parser of records persisted in the ReplicaDB log. */
private static class ReplicaDBParser implements RecordParser<CSN, UpdateMsg>
{
- private static final DebugTracer TRACER = getTracer();
@Override
public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
{
final UpdateMsg message = record.getValue();
- try
- {
- return ByteString.wrap(message.getBytes());
- }
- catch (UnsupportedEncodingException e)
- {
- // should never happen
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- return ByteString.empty();
- }
+ return ByteString.wrap(message.getBytes());
}
@Override
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index 71becb1..afc94f0 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -34,6 +34,8 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
@@ -56,14 +58,14 @@
* <p>
* This is the only class that should have code using the BDB interfaces.
*/
-public class ReplicationDB
+class ReplicationDB
{
private Database db;
- private ReplicationDbEnv dbEnv;
- private ReplicationServer replicationServer;
- private int serverId;
- private DN baseDN;
+ private final ReplicationDbEnv dbEnv;
+ private final ReplicationServer replicationServer;
+ private final int serverId;
+ private final DN baseDN;
/**
* The lock used to provide exclusive access to the thread that close the db
@@ -120,7 +122,7 @@
* @param dbEnv The Db environment to use to create the db.
* @throws ChangelogException If a database problem happened
*/
- public ReplicationDB(int serverId, DN baseDN,
+ ReplicationDB(int serverId, DN baseDN,
ReplicationServer replicationServer, ReplicationDbEnv dbEnv)
throws ChangelogException
{
@@ -188,7 +190,7 @@
* @throws ChangelogException
* If a database problem happened
*/
- public void addEntry(UpdateMsg change) throws ChangelogException
+ void addEntry(UpdateMsg change) throws ChangelogException
{
dbCloseLock.readLock().lock();
try
@@ -200,7 +202,9 @@
}
final DatabaseEntry key = createReplicationKey(change.getCSN());
- final DatabaseEntry data = new ReplicationData(change);
+ // Always keep messages in the replication DB with the current protocol
+ // version
+ final DatabaseEntry data = new DatabaseEntry(change.getBytes());
insertCounterRecordIfNeeded(change.getCSN());
db.put(null, key, data);
@@ -256,7 +260,7 @@
/**
* Shutdown the database.
*/
- public void shutdown()
+ void shutdown()
{
dbCloseLock.writeLock().lock();
try
@@ -286,8 +290,7 @@
* @throws ChangelogException
* If a database problem happened
*/
- public ReplServerDBCursor openReadCursor(CSN startCSN)
- throws ChangelogException
+ ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
{
return new ReplServerDBCursor(startCSN);
}
@@ -301,7 +304,7 @@
*
* @return The ReplServerDBCursor.
*/
- public ReplServerDBCursor openDeleteCursor() throws ChangelogException
+ ReplServerDBCursor openDeleteCursor() throws ChangelogException
{
return new ReplServerDBCursor();
}
@@ -325,7 +328,7 @@
* @throws ChangelogException
* If a database problem happened
*/
- public CSN readOldestCSN() throws ChangelogException
+ CSN readOldestCSN() throws ChangelogException
{
dbCloseLock.readLock().lock();
@@ -381,7 +384,7 @@
* @throws ChangelogException
* If a database problem happened
*/
- public CSN readNewestCSN() throws ChangelogException
+ CSN readNewestCSN() throws ChangelogException
{
dbCloseLock.readLock().lock();
@@ -432,93 +435,7 @@
}
}
- /**
- * Try to find in the DB, the CSN right before the one passed as a parameter.
- *
- * @param csn
- * The CSN from which we start searching.
- * @return the CSN right before the one passed as a parameter. Can return null
- * if there is none.
- * @throws ChangelogException
- * If a database problem happened
- */
- public CSN getPreviousCSN(CSN csn) throws ChangelogException
- {
- if (csn == null)
- {
- return null;
- }
-
- dbCloseLock.readLock().lock();
-
- Cursor cursor = null;
- try
- {
- // If the DB has been closed then return immediately.
- if (isDBClosed())
- {
- return null;
- }
-
- DatabaseEntry key = createReplicationKey(csn);
- DatabaseEntry data = new DatabaseEntry();
- cursor = db.openCursor(null, null);
- if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == SUCCESS)
- {
- // We can move close to the CSN.
- // Let's move to the previous change.
- if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
- {
- return getRegularRecord(cursor, key, data);
- }
- // else, there was no change previous to our CSN.
- }
- else
- {
- // We could not move the cursor past to the CSN
- // Check if the last change is older than CSN
- if (cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS)
- {
- return getRegularRecord(cursor, key, data);
- }
- }
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- finally
- {
- closeAndReleaseReadLock(cursor);
- }
- return null;
- }
-
- private CSN getRegularRecord(Cursor cursor, DatabaseEntry key,
- DatabaseEntry data) throws DatabaseException
- {
- final CSN csn = toCSN(key.getData());
- if (!isACounterRecord(csn))
- {
- return csn;
- }
-
- // There cannot be 2 counter record next to each other,
- // it is safe to return previous record which must exist
- if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
- {
- return toCSN(key.getData());
- }
-
- // database only contain a counter record, which should not be possible
- // let's just say no CSN
- return null;
- }
-
-
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
@@ -529,7 +446,7 @@
* This Class implements a cursor that can be used to browse a
* replicationServer database.
*/
- public class ReplServerDBCursor implements Closeable
+ class ReplServerDBCursor implements Closeable
{
/**
* The transaction that will protect the actions done with the cursor.
@@ -713,7 +630,7 @@
* (per the Cursor documentation).
* This should not be used in any other case.
*/
- public void abort()
+ void abort()
{
synchronized (this)
{
@@ -735,7 +652,7 @@
* @throws ChangelogException
* In case of underlying database problem.
*/
- public CSN nextCSN() throws ChangelogException
+ CSN nextCSN() throws ChangelogException
{
if (isClosed)
{
@@ -761,7 +678,7 @@
*
* @return the next UpdateMsg.
*/
- public UpdateMsg next()
+ UpdateMsg next()
{
if (isClosed)
{
@@ -791,7 +708,8 @@
{
continue;
}
- currentChange = ReplicationData.generateChange(data.getData());
+ currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
+ data.getData(), ProtocolVersion.getCurrentVersion());
}
catch (Exception e)
{
@@ -806,7 +724,7 @@
*/
Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD
.get(replicationServer.getServerId(),
- (csn == null ? "" : csn.toString()),
+ (csn != null ? csn.toString() : ""),
e.getMessage());
logError(message);
}
@@ -819,7 +737,7 @@
*
* @throws ChangelogException In case of database problem.
*/
- public void delete() throws ChangelogException
+ void delete() throws ChangelogException
{
if (isClosed)
{
@@ -842,7 +760,7 @@
*
* @throws ChangelogException In case of database problem.
*/
- public void clear() throws ChangelogException
+ void clear() throws ChangelogException
{
// The coming users will be blocked until the clear is done
dbCloseLock.writeLock().lock();
@@ -912,7 +830,7 @@
* Encode the provided counter value in a database entry.
* @return The database entry with the counter value encoded inside.
*/
- static private DatabaseEntry encodeCounterValue(int value)
+ private static DatabaseEntry encodeCounterValue(int value)
{
DatabaseEntry entry = new DatabaseEntry();
entry.setData(getBytes(String.valueOf(value)));
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationData.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationData.java
deleted file mode 100644
index a854328..0000000
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationData.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2010-2011 ForgeRock AS.
- */
-package org.opends.server.replication.server.changelog.je;
-
-import java.io.UnsupportedEncodingException;
-
-import com.sleepycat.je.DatabaseEntry;
-
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-
-/**
- * SuperClass of DatabaseEntry used for data stored in the ReplicationServer
- * Databases.
- */
-public class ReplicationData extends DatabaseEntry
-{
- private static final long serialVersionUID = 1L;
-
- /**
- * Creates a new ReplicationData object from an UpdateMsg.
- *
- * @param change the UpdateMsg used to create the ReplicationData.
- */
- public ReplicationData(UpdateMsg change)
- {
- // Always keep messages in the replication DB with the current protocol
- // version
- try
- {
- this.setData(change.getBytes());
- }
- catch (UnsupportedEncodingException e)
- {
- // This should not happen - UTF-8 is always available.
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Generate an UpdateMsg from its byte[] form.
- *
- * @param data The DatabaseEntry used to generate the UpdateMsg.
- *
- * @return The generated change.
- *
- * @throws Exception When the data was not a valid Update Message.
- */
- public static UpdateMsg generateChange(byte[] data)
- throws Exception
- {
- return (UpdateMsg) ReplicationMsg.generateMsg(
- data, ProtocolVersion.getCurrentVersion());
- }
-}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
index 68cf2c1..db382e6 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS
+ * Portions Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.common;
@@ -93,11 +93,6 @@
String stringRep = serverState.toString();
assertTrue(stringRep.contains(csn2.toString()));
assertTrue(stringRep.contains(csn3.toString()));
-
- // Check getBytes
- byte[] b = serverState.getBytes();
- ServerState generatedServerState = new ServerState(b,0,b.length -1) ;
- assertEquals(b, generatedServerState.getBytes()) ;
}
/**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java
index 21133fc..bce3eb3 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java
@@ -27,13 +27,22 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.zip.DataFormatException;
import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteStringBuilder;
-import org.testng.Assert;
+import org.opends.server.types.DN;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+
/**
* Test for {@link ByteStringBuilder} and {@link ByteArrayScanner} classes.
*/
@@ -41,44 +50,194 @@
public class ByteArrayTest extends DirectoryServerTestCase
{
+ private static final class IntegerRange implements Iterator<Object[]>
+ {
+ private int next;
+ private final int endInclusive;
+
+ public IntegerRange(int startInclusive, int endInclusive)
+ {
+ this.next = startInclusive;
+ this.endInclusive = endInclusive;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return next <= this.endInclusive;
+ }
+
+ @Override
+ public Object[] next()
+ {
+ return new Object[] { next++ };
+ }
+
+ @Override
+ public void remove() { /* unused */ }
+ }
+
+ @BeforeClass
+ public void setup() throws Exception
+ {
+ TestCaseUtils.startFakeServer();
+ }
+
+ @AfterClass
+ public void teardown() throws Exception
+ {
+ TestCaseUtils.shutdownFakeServer();
+ }
+
+ private final byte[] byteArray = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, };
+
@Test
public void testBuilderAppendMethodsAndScannerNextMethods() throws Exception
{
- final boolean bo = true;
+ final boolean boFalse = false;
+ final boolean boTrue = true;
final byte by = 80;
final short sh = 42;
final int i = sh + 1;
final long l = i + 1;
- final String st = "Yay!";
+ final String nullStr = null;
+ final String str = "Yay!";
final Collection<String> col = Arrays.asList("foo", "bar", "baz");
final CSN csn = new CSN(42424242, 13, 42);
+ final DN dn = DN.decode("dc=example,dc=com");
+ final ServerState ss = new ServerState();
+ ss.update(csn);
byte[] bytes = new ByteArrayBuilder()
- .append(bo)
+ .append(boTrue)
+ .append(boFalse)
.append(by)
.append(sh)
.append(i)
.append(l)
- .append(st)
+ .append(nullStr)
+ .append(str)
.appendStrings(col)
.appendUTF8(i)
.appendUTF8(l)
.append(csn)
.appendUTF8(csn)
+ .append(dn)
+ .appendZeroTerminated(byteArray)
+ .append(byteArray)
+ .append(ss)
.toByteArray();
final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
- Assert.assertEquals(scanner.nextBoolean(), bo);
- Assert.assertEquals(scanner.nextByte(), by);
- Assert.assertEquals(scanner.nextShort(), sh);
- Assert.assertEquals(scanner.nextInt(), i);
- Assert.assertEquals(scanner.nextLong(), l);
- Assert.assertEquals(scanner.nextString(), st);
- Assert.assertEquals(scanner.nextStrings(new ArrayList<String>()), col);
- Assert.assertEquals(scanner.nextIntUTF8(), i);
- Assert.assertEquals(scanner.nextLongUTF8(), l);
- Assert.assertEquals(scanner.nextCSN(), csn);
- Assert.assertEquals(scanner.nextCSNUTF8(), csn);
- Assert.assertTrue(scanner.isEmpty());
+ assertFalse(scanner.isEmpty());
+ assertEquals(scanner.nextBoolean(), boTrue);
+ assertEquals(scanner.nextBoolean(), boFalse);
+ assertEquals(scanner.nextByte(), by);
+ assertEquals(scanner.nextShort(), sh);
+ assertEquals(scanner.nextInt(), i);
+ assertEquals(scanner.nextLong(), l);
+ assertEquals(scanner.nextString(), nullStr);
+ assertEquals(scanner.nextString(), str);
+ assertEquals(scanner.nextStrings(new ArrayList<String>()), col);
+ assertEquals(scanner.nextIntUTF8(), i);
+ assertEquals(scanner.nextLongUTF8(), l);
+ assertEquals(scanner.nextCSN(), csn);
+ assertEquals(scanner.nextCSNUTF8(), csn);
+ assertEquals(scanner.nextDN(), dn);
+ assertEquals(scanner.nextByteArray(byteArray.length), byteArray);
+ scanner.skipZeroSeparator();
+ assertEquals(scanner.nextByteArray(byteArray.length), byteArray);
+ assertEquals(scanner.nextServerState().toString(), ss.toString());
+ assertTrue(scanner.isEmpty());
}
+
+ @Test
+ public void testByteArrayScanner_remainingBytes() throws Exception
+ {
+ final byte[] bytes = new ByteArrayBuilder().append(byteArray).toByteArray();
+
+ final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
+ assertEquals(scanner.remainingBytes(), byteArray);
+ assertTrue(scanner.isEmpty());
+ }
+
+ @Test
+ public void testByteArrayScanner_remainingBytesZeroTerminated() throws Exception
+ {
+ final byte[] bytes =
+ new ByteArrayBuilder().appendZeroTerminated(byteArray).toByteArray();
+
+ final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
+ assertEquals(scanner.remainingBytesZeroTerminated(), byteArray);
+ assertTrue(scanner.isEmpty());
+ }
+
+ @DataProvider
+ public Iterator<Object[]> testCasesForNextMethodsWithEmptyByteArray()
+ {
+ return new IntegerRange(0, 7);
+ }
+
+ @Test(dataProvider = "testCasesForNextMethodsWithEmptyByteArray",
+ expectedExceptions = DataFormatException.class)
+ public void testByteArrayScanner_nextMethods_throwsExceptionWhenNoData(int testNumber) throws Exception
+ {
+ delegate(testNumber);
+ }
+
+ /**
+ * TestNG does not like test methods with a return type other than void,
+ * so used a delegate to simplify the code down below.
+ */
+ private Object delegate(int testNumber) throws DataFormatException
+ {
+ final ByteArrayScanner scanner = new ByteArrayScanner(new byte[0]);
+ switch (testNumber)
+ {
+ case 0:
+ return scanner.nextByte();
+ case 1:
+ return scanner.nextBoolean();
+ case 2:
+ return scanner.nextShort();
+ case 3:
+ return scanner.nextInt();
+ case 4:
+ return scanner.nextIntUTF8();
+ case 5:
+ return scanner.nextLong();
+ case 6:
+ return scanner.nextLongUTF8();
+ case 7:
+ return scanner.nextCSN();
+ default:
+ return null;
+ }
+ }
+
+ @Test(expectedExceptions = IndexOutOfBoundsException.class)
+ public void testByteArrayScanner_skipZeroSeparator_throwsExceptionWhenNoData() throws Exception
+ {
+ new ByteArrayScanner(new byte[0]).skipZeroSeparator();
+ }
+
+ @Test(expectedExceptions = DataFormatException.class)
+ public void testByteArrayScanner_skipZeroSeparator_throwsExceptionWhenNoZeroSeparator() throws Exception
+ {
+ new ByteArrayScanner(new byte[] { 1 }).skipZeroSeparator();
+ }
+
+ @Test(expectedExceptions = DataFormatException.class)
+ public void testByteArrayScanner_nextCSNUTF8_throwsExceptionWhenInvalidCSN() throws Exception
+ {
+ new ByteArrayScanner(new byte[] { 1, 0 }).nextCSNUTF8();
+ }
+
+ @Test(expectedExceptions = DataFormatException.class)
+ public void testByteArrayScanner_nextDN_throwsExceptionWhenInvalidDN() throws Exception
+ {
+ final byte[] bytes = new ByteArrayBuilder().append("this is not a valid DN").toByteArray();
+ new ByteArrayScanner(bytes).nextDN();
+ }
+
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index f4c2d72..62c41a7 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -38,6 +38,7 @@
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.common.ServerState;
@@ -894,9 +895,9 @@
}
// Send StartSessionMsg
- StartSessionMsg startSessionMsg =
- new StartSessionMsg(ServerStatus.NORMAL_STATUS,
- new ArrayList<String>());
+ StartSessionMsg startSessionMsg = new StartSessionMsg(
+ ServerStatus.NORMAL_STATUS, new ArrayList<String>(),
+ false, AssuredMode.SAFE_DATA_MODE, (byte) 1);
session.publish(startSessionMsg);
// Read the TopologyMsg that should come back.
--
Gitblit v1.10.0