From 5a0befe36d7b7e93a65bb19df19d667d413c7c89 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 30 Apr 2014 15:07:20 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3443) Make the Medium Consistency Point support replicas temporarily leaving the topology
---
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 22
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java | 348 +++++++++++++
opendj3-server-dev/src/server/org/opends/server/replication/server/ChangelogState.java | 37 +
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 51 +
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 4
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java | 281 +++++++++++
opendj3-server-dev/src/server/org/opends/server/replication/protocol/StopMsg.java | 13
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java | 83 +++
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 11
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java | 108 ++-
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 69 +
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 15
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java | 53 +
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 142 ++++
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java | 155 +++--
opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java | 15
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java | 8
opendj3-server-dev/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java | 37 +
18 files changed, 1,252 insertions(+), 200 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java
new file mode 100644
index 0000000..96089c4
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java
@@ -0,0 +1,348 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.opends.server.replication.common.CSN;
+
+/**
+ * Byte array builder class encodes data into byte arrays to send messages over
+ * the replication protocol. Built on top of {@link ByteStringBuilder}, it
+ * isolates the latter against legacy type conversions from the replication
+ * protocol. It exposes a fluent API.
+ *
+ * @see ByteArrayScanner ByteArrayScanner class that decodes messages built with
+ * current class.
+ */
+public class ByteArrayBuilder
+{
+
+ /** This is the null byte, also known as zero byte. */
+ public static final byte NULL_BYTE = 0;
+ private final ByteStringBuilder builder;
+
+ /**
+ * Constructs a ByteArrayBuilder.
+ */
+ public ByteArrayBuilder()
+ {
+ builder = new ByteStringBuilder();
+ }
+
+ /**
+ * Constructs a ByteArrayBuilder.
+ *
+ * @param capacity
+ * the capacity of the underlying ByteStringBuilder
+ */
+ public ByteArrayBuilder(int capacity)
+ {
+ builder = new ByteStringBuilder(capacity);
+ }
+
+ /**
+ * Append a boolean to this ByteArrayBuilder.
+ *
+ * @param b
+ * the boolean to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder append(boolean b)
+ {
+ append((byte) (b ? 1 : 0));
+ return this;
+ }
+
+ /**
+ * Append a byte to this ByteArrayBuilder.
+ *
+ * @param b
+ * the byte to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder append(byte b)
+ {
+ builder.append(b);
+ return this;
+ }
+
+ /**
+ * Append a short to this ByteArrayBuilder.
+ *
+ * @param s
+ * the short to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder append(short s)
+ {
+ builder.append(s);
+ return this;
+ }
+
+ /**
+ * Append an int to this ByteArrayBuilder.
+ *
+ * @param i
+ * the long to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder append(int i)
+ {
+ builder.append(i);
+ return this;
+ }
+
+ /**
+ * Append a long to this ByteArrayBuilder.
+ *
+ * @param l
+ * the long to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder append(long l)
+ {
+ builder.append(l);
+ return this;
+ }
+
+ /**
+ * Append an int to this ByteArrayBuilder by converting it to a String then
+ * encoding that string to a UTF-8 byte array.
+ *
+ * @param i
+ * the int to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder appendUTF8(int i)
+ {
+ return append(Integer.toString(i));
+ }
+
+ /**
+ * Append a long to this ByteArrayBuilder by converting it to a String then
+ * encoding that string to a UTF-8 byte array.
+ *
+ * @param l
+ * the long to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder appendUTF8(long l)
+ {
+ return append(Long.toString(l));
+ }
+
+ /**
+ * Append a Collection of Strings to this ByteArrayBuilder.
+ *
+ * @param col
+ * the Collection of Strings to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder appendStrings(Collection<String> col)
+ {
+ append(col.size());
+ for (String s : col)
+ {
+ append(s);
+ }
+ return this;
+ }
+
+ /**
+ * Append a String to this ByteArrayBuilder.
+ *
+ * @param s
+ * the String to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder append(String s)
+ {
+ try
+ {
+ append(s.getBytes("UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException("Should never happen", e);
+ }
+ return this;
+ }
+
+ /**
+ * Append a CSN to this ByteArrayBuilder.
+ *
+ * @param csn
+ * the CSN to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder append(CSN csn)
+ {
+ csn.toByteString(builder);
+ return this;
+ }
+
+ /**
+ * Append a CSN to this ByteArrayBuilder by converting it to a String then
+ * encoding that string to a UTF-8 byte array.
+ *
+ * @param csn
+ * the CSN to append.
+ * @return this ByteArrayBuilder
+ */
+ public ByteArrayBuilder appendUTF8(CSN csn)
+ {
+ append(csn.toString());
+ return this;
+ }
+
+ private ByteArrayBuilder append(byte[] sBytes)
+ {
+ for (byte b : sBytes)
+ {
+ append(b);
+ }
+ append((byte) 0); // zero separator
+ return this;
+ }
+
+ /**
+ * Converts the content of this ByteStringBuilder to a byte array.
+ *
+ * @return the content of this ByteStringBuilder converted to a byte array.
+ */
+ public byte[] toByteArray()
+ {
+ return builder.toByteArray();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return builder.toString();
+ }
+
+ /**
+ * Helper method that returns the number of bytes that would be used by the
+ * boolean fields when appended to a ByteArrayBuilder.
+ *
+ * @param nbFields
+ * the number of boolean fields that will be appended to a
+ * ByteArrayBuilder
+ * @return the number of bytes occupied by the appended boolean fields.
+ */
+ public static int booleans(int nbFields)
+ {
+ return nbFields;
+ }
+
+ /**
+ * Helper method that returns the number of bytes that would be used by the
+ * byte fields when appended to a ByteArrayBuilder.
+ *
+ * @param nbFields
+ * the number of byte fields that will be appended to a
+ * ByteArrayBuilder
+ * @return the number of bytes occupied by the appended byte fields.
+ */
+ public static int bytes(int nbFields)
+ {
+ return nbFields;
+ }
+
+ /**
+ * Helper method that returns the number of bytes that would be used by the
+ * short fields when appended to a ByteArrayBuilder.
+ *
+ * @param nbFields
+ * the number of short fields that will be appended to a
+ * ByteArrayBuilder
+ * @return the number of bytes occupied by the appended short fields.
+ */
+ public static int shorts(int nbFields)
+ {
+ return 2 * nbFields;
+ }
+
+ /**
+ * Helper method that returns the number of bytes that would be used by the
+ * int fields when appended to a ByteArrayBuilder.
+ *
+ * @param nbFields
+ * the number of int fields that will be appended to a
+ * ByteArrayBuilder
+ * @return the number of bytes occupied by the appended int fields.
+ */
+ public static int ints(int nbFields)
+ {
+ return 4 * nbFields;
+ }
+
+ /**
+ * Helper method that returns the number of bytes that would be used by the
+ * long fields when appended to a ByteArrayBuilder.
+ *
+ * @param nbFields
+ * the number of long fields that will be appended to a
+ * ByteArrayBuilder
+ * @return the number of bytes occupied by the appended long fields.
+ */
+ public static int longs(int nbFields)
+ {
+ return 8 * nbFields;
+ }
+
+ /**
+ * Helper method that returns the number of bytes that would be used by the
+ * CSN fields when appended to a ByteArrayBuilder.
+ *
+ * @param nbFields
+ * the number of CSN fields that will be appended to a
+ * ByteArrayBuilder
+ * @return the number of bytes occupied by the appended CSN fields.
+ */
+ public static int csns(int nbFields)
+ {
+ return CSN.BYTE_ENCODING_LENGTH * nbFields;
+ }
+
+ /**
+ * Helper method that returns the number of bytes that would be used by the
+ * CSN fields encoded as a UTF8 string when appended to a ByteArrayBuilder.
+ *
+ * @param nbFields
+ * the number of CSN fields that will be appended to a
+ * ByteArrayBuilder
+ * @return the number of bytes occupied by the appended legacy-encoded CSN
+ * fields.
+ */
+ public static int csnsUTF8(int nbFields)
+ {
+ return CSN.STRING_ENCODING_LENGTH * nbFields + 1 /* null byte */;
+ }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java
new file mode 100644
index 0000000..13be682
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java
@@ -0,0 +1,281 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.protocol;
+
+import java.util.Collection;
+import java.util.zip.DataFormatException;
+
+import org.forgerock.opendj.ldap.ByteSequenceReader;
+import org.forgerock.opendj.ldap.ByteString;
+import org.opends.server.replication.common.CSN;
+
+/**
+ * Byte array scanner class helps decode data from byte arrays received via
+ * messages over the replication protocol. Built on top of
+ * {@link ByteSequenceReader}, it isolates the latter against legacy type
+ * conversions from the replication protocol.
+ *
+ * @see ByteArrayBuilder ByteArrayBuilder class that encodes messages read with
+ * current class.
+ */
+public class ByteArrayScanner
+{
+
+ private final ByteSequenceReader bytes;
+
+ /**
+ * Builds a ByteArrayScanner object that will read from the supplied byte
+ * array.
+ *
+ * @param bytes
+ * the byte array input that will be read from
+ */
+ public ByteArrayScanner(byte[] bytes)
+ {
+ this.bytes = ByteString.wrap(bytes).asReader();
+ }
+
+ /**
+ * Reads the next boolean.
+ *
+ * @return the next boolean
+ * @throws DataFormatException
+ * if no more data can be read from the input
+ */
+ public boolean nextBoolean() throws DataFormatException
+ {
+ return nextByte() != 0;
+ }
+
+ /**
+ * Reads the next byte.
+ *
+ * @return the next byte
+ * @throws DataFormatException
+ * if no more data can be read from the input
+ */
+ public byte nextByte() throws DataFormatException
+ {
+ try
+ {
+ return bytes.get();
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new DataFormatException(e.getMessage());
+ }
+ }
+
+ /**
+ * Reads the next short.
+ *
+ * @return the next short
+ * @throws DataFormatException
+ * if no more data can be read from the input
+ */
+ public short nextShort() throws DataFormatException
+ {
+ try
+ {
+ return bytes.getShort();
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new DataFormatException(e.getMessage());
+ }
+ }
+
+ /**
+ * Reads the next int.
+ *
+ * @return the next int
+ * @throws DataFormatException
+ * if no more data can be read from the input
+ */
+ public int nextInt() throws DataFormatException
+ {
+ try
+ {
+ return bytes.getInt();
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new DataFormatException(e.getMessage());
+ }
+ }
+
+ /**
+ * Reads the next long.
+ *
+ * @return the next long
+ * @throws DataFormatException
+ * if no more data can be read from the input
+ */
+ public long nextLong() throws DataFormatException
+ {
+ try
+ {
+ return bytes.getLong();
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new DataFormatException(e.getMessage());
+ }
+ }
+
+ /**
+ * Reads the next int that was encoded as a UTF8 string.
+ *
+ * @return the next int that was encoded as a UTF8 string.
+ * @throws DataFormatException
+ * if no more data can be read from the input
+ */
+ public int nextIntUTF8() throws DataFormatException
+ {
+ return Integer.valueOf(nextString());
+ }
+
+ /**
+ * Reads the next long that was encoded as a UTF8 string.
+ *
+ * @return the next long that was encoded as a UTF8 string.
+ * @throws DataFormatException
+ * if no more data can be read from the input
+ */
+ public long nextLongUTF8() throws DataFormatException
+ {
+ return Long.valueOf(nextString());
+ }
+
+ /**
+ * Reads the next UTF8-encoded string.
+ *
+ * @return the next UTF8-encoded string.
+ * @throws DataFormatException
+ * if no more data can be read from the input
+ */
+ public String nextString() throws DataFormatException
+ {
+ try
+ {
+ final String s = bytes.getString(findZeroSeparator());
+ bytes.skip(1); // skip the zero separator
+ return s;
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new DataFormatException(e.getMessage());
+ }
+ }
+
+ private int findZeroSeparator() throws DataFormatException
+ {
+ int offset = 0;
+ final int remaining = bytes.remaining();
+ while (bytes.peek(offset) != 0 && offset < remaining)
+ {
+ offset++;
+ }
+ if (offset == remaining)
+ {
+ throw new DataFormatException("No more data to read from");
+ }
+ return offset;
+ }
+
+ /**
+ * Reads the next UTF8-encoded strings in the provided collection.
+ *
+ * @param output
+ * the collection where to add the next UTF8-encoded strings
+ * @param <TCol>
+ * the collection's concrete type
+ * @return the provided collection where the next UTF8-encoded strings have
+ * been added.
+ * @throws DataFormatException
+ * if no more data can be read from the input
+ */
+ public <TCol extends Collection<String>> TCol nextStrings(TCol output)
+ throws DataFormatException
+ {
+ final int colSize = nextInt();
+ for (int i = 0; i < colSize; i++)
+ {
+ output.add(nextString());
+ }
+ return output;
+ }
+
+ /**
+ * Reads the next CSN.
+ *
+ * @return the next CSN.
+ * @throws DataFormatException
+ * if CSN was incorrectly encoded or no more data can be read from
+ * the input
+ */
+ public CSN nextCSN() throws DataFormatException
+ {
+ try
+ {
+ return CSN.valueOf(bytes.getByteSequence(CSN.BYTE_ENCODING_LENGTH));
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new DataFormatException(e.getMessage());
+ }
+ }
+
+ /**
+ * Reads the next CSN that was encoded as a UTF8 string.
+ *
+ * @return the next CSN that was encoded as a UTF8 string.
+ * @throws DataFormatException
+ * if legacy CSN was incorrectly encoded or no more data can be read
+ * from the input
+ */
+ public CSN nextCSNUTF8() throws DataFormatException
+ {
+ try
+ {
+ return CSN.valueOf(nextString());
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new DataFormatException(e.getMessage());
+ }
+ }
+
+ /**
+ * Returns whether the scanner has more bytes to consume.
+ *
+ * @return true if the scanner has more bytes to consume, false otherwise.
+ */
+ public boolean isEmpty()
+ {
+ return bytes.remaining() == 0;
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
index b31a5cc..291e1bf 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -29,9 +29,9 @@
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.CSN;
-import org.forgerock.opendj.ldap.ByteSequenceReader;
-import org.forgerock.opendj.ldap.ByteString;
-import org.forgerock.opendj.ldap.ByteStringBuilder;
+
+import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
/**
* Class that define messages sent by a replication domain (DS) to the
@@ -39,21 +39,48 @@
*/
public class ChangeTimeHeartbeatMsg extends ReplicationMsg
{
+ private static final byte NORMAL_HEARTBEAT = 0;
+ private static final byte REPLICA_OFFLINE_HEARTBEAT = 1;
+
/**
* The CSN containing the change time.
*/
private final CSN csn;
+ /**
+ * The CSN containing the change time.
+ */
+ private final byte eventType;
+
+ private ChangeTimeHeartbeatMsg(CSN csn, byte eventType)
+ {
+ this.csn = csn;
+ this.eventType = eventType;
+ }
/**
- * Constructor of a Change Time Heartbeat message providing the change time
- * value in a CSN.
+ * Factory method that builds a change time heartbeat message providing the
+ * change time value in a CSN.
*
* @param csn
* The provided CSN.
+ * @return a new ChangeTimeHeartbeatMsg
*/
- public ChangeTimeHeartbeatMsg(CSN csn)
+ public static ChangeTimeHeartbeatMsg heartbeatMsg(CSN csn)
{
- this.csn = csn;
+ return new ChangeTimeHeartbeatMsg(csn, NORMAL_HEARTBEAT);
+ }
+
+ /**
+ * Factory method that builds a change time heartbeat message for a replica
+ * going offline.
+ *
+ * @param offlineCSN
+ * the serverId and timestamp of the replica going offline
+ * @return a new ChangeTimeHeartbeatMsg
+ */
+ public static ChangeTimeHeartbeatMsg replicaOfflineMsg(CSN offlineCSN)
+ {
+ return new ChangeTimeHeartbeatMsg(offlineCSN, REPLICA_OFFLINE_HEARTBEAT);
}
/**
@@ -67,6 +94,17 @@
}
/**
+ * Returns whether this is a replica offline message.
+ *
+ * @return true if this is a replica offline message, false if this is a
+ * regular heartbeat message.
+ */
+ public boolean isReplicaOfflineMsg()
+ {
+ return eventType == REPLICA_OFFLINE_HEARTBEAT;
+ }
+
+ /**
* Creates a message from a provided byte array.
*
* @param in
@@ -79,32 +117,31 @@
public ChangeTimeHeartbeatMsg(byte[] in, short version)
throws DataFormatException
{
- final ByteSequenceReader reader = ByteString.wrap(in).asReader();
try
{
- if (reader.get() != MSG_TYPE_CT_HEARTBEAT)
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_CT_HEARTBEAT)
{
- // Throw better exception below.
- throw new IllegalArgumentException();
+ throw new DataFormatException("input is not a valid "
+ + getClass().getSimpleName() + " message: " + msgType);
}
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
- {
- csn = CSN.valueOf(reader.getByteSequence(CSN.BYTE_ENCODING_LENGTH));
- }
- else
- {
- csn = CSN.valueOf(reader.getString(CSN.STRING_ENCODING_LENGTH));
- reader.get(); // Read trailing 0 byte.
- }
+ csn = version >= REPLICATION_PROTOCOL_V7
+ ? scanner.nextCSN()
+ : scanner.nextCSNUTF8();
+ eventType = version >= REPLICATION_PROTOCOL_V8
+ ? scanner.nextByte()
+ : NORMAL_HEARTBEAT;
- if (reader.remaining() > 0)
+ if (!scanner.isEmpty())
{
- // Throw better exception below.
- throw new IllegalArgumentException();
+ throw new DataFormatException(
+ "Did not expect to find more bytes to read for "
+ + getClass().getSimpleName() + " message.");
}
}
- catch (Exception e)
+ catch (RuntimeException e)
{
// Index out of bounds, bad format, etc.
throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg");
@@ -115,24 +152,22 @@
@Override
public byte[] getBytes(short protocolVersion)
{
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+ if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V7)
{
- final ByteStringBuilder builder = new ByteStringBuilder(
- CSN.BYTE_ENCODING_LENGTH + 1 /* type + csn */);
+ ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csnsUTF8(1));
builder.append(MSG_TYPE_CT_HEARTBEAT);
- csn.toByteString(builder);
+ builder.appendUTF8(csn);
return builder.toByteArray();
}
- else
+
+ final ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csns(1));
+ builder.append(MSG_TYPE_CT_HEARTBEAT);
+ builder.append(csn);
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V8)
{
- final ByteStringBuilder builder = new ByteStringBuilder(
- CSN.STRING_ENCODING_LENGTH + 2 /* type + csn str + nul */);
- builder.append(MSG_TYPE_CT_HEARTBEAT);
- builder.append(csn.toString());
- builder.append((byte) 0); // For compatibility with earlier protocol
- // versions.
- return builder.toByteArray();
+ builder.append(eventType);
}
+ return builder.toByteArray();
}
/** {@inheritDoc} */
@@ -141,4 +176,5 @@
{
return getClass().getSimpleName() + ", csn=" + csn.toStringUI();
}
+
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
index 9eb2fc4..1e6e2ba 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -22,11 +22,10 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.protocol;
-
/**
* The version utility class for the replication protocol.
*/
@@ -43,52 +42,72 @@
public static final short REPLICATION_PROTOCOL_V1_REAL = 49;
/**
* The constant for the second version of the replication protocol.
- * Add fields in the header for assured replication.
+ * <ul>
+ * <li>Add fields in the header for assured replication.</li>
+ * </ul>
*/
public static final short REPLICATION_PROTOCOL_V2 = 2;
/**
* The constant for the 3rd version of the replication protocol.
- * Add messages for remote ECL : not used as of today.
+ * <ul>
+ * <li>Add messages for remote ECL : not used as of today.</li>
+ * </ul>
*/
public static final short REPLICATION_PROTOCOL_V3 = 3;
/**
* The constant for the 4th version of the replication protocol.
- * - Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
- * ECL entry attributes.
- * - Modified algorithm for choosing a RS to connect to: introduction of a
- * ReplicationServerDSMsg message.
- * -> also added of the server URL in RSInfo of TopologyMsg
- * - Introduction of a StopMsg for proper connections ending.
- * - Initialization failover/flow control
+ * <ul>
+ * <li>Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
+ * ECL entry attributes.</li>
+ * <li>Modified algorithm for choosing a RS to connect to: introduction of a
+ * ReplicationServerDSMsg message.</li>
+ * <li>also added of the server URL in RSInfo of TopologyMsg</li>
+ * <li>Introduction of a StopMsg for proper connections ending.</li>
+ * <li>Initialization failover/flow control</li>
+ * </ul>
*/
public static final short REPLICATION_PROTOCOL_V4 = 4;
/**
* The constant for the 5th version of the replication protocol.
- * - Add support for wild-cards in change log included attributes
- * - Add support for specifying additional included attributes for deletes
- * - See OPENDJ-194.
+ * <ul>
+ * <li>Add support for wild-cards in change log included attributes</li>
+ * <li>Add support for specifying additional included attributes for deletes</li>
+ * <li>See OPENDJ-194.</li>
+ * </ul>
*/
public static final short REPLICATION_PROTOCOL_V5 = 5;
/**
* The constant for the 6th version of the replication protocol.
- * - include DS local URL in the DSInfo of TopologyMsg.
+ * <ul>
+ * <li>include DS local URL in the DSInfo of TopologyMsg.</li>
+ * </ul>
*/
public static final short REPLICATION_PROTOCOL_V6 = 6;
/**
* The constant for the 7th version of the replication protocol.
- * - compact encoding for length, CSNs, and server IDs.
+ * <ul>
+ * <li>compact encoding for length, CSNs, and server IDs.</li>
+ * </ul>
*/
public static final short REPLICATION_PROTOCOL_V7 = 7;
/**
+ * The constant for the 8th version of the replication protocol.
+ * <ul>
+ * <li>StopMsg now has a timestamp to communicate the replica stop time.</li>
+ * </ul>
+ */
+ public static final short REPLICATION_PROTOCOL_V8 = 8;
+
+ /**
* The replication protocol version used by the instance of RS/DS in this VM.
*/
- private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V7;
+ private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V8;
/**
* Gets the current version of the replication protocol.
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StopMsg.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StopMsg.java
index 30228b3..c03dbf0 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StopMsg.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StopMsg.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions copyright 2013 ForgeRock AS.
+ * Portions copyright 2013-2014 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -56,9 +56,7 @@
in[0]);
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short protocolVersion)
{
@@ -67,4 +65,11 @@
MSG_TYPE_STOP
};
}
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName();
+ }
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ChangelogState.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ChangelogState.java
index a454a35..a0cb04b 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ChangelogState.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ChangelogState.java
@@ -21,7 +21,7 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
+import org.opends.server.replication.common.CSN;
import org.opends.server.types.DN;
/**
@@ -49,6 +50,8 @@
private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>();
private final Map<DN, List<Integer>> domainToServerIds =
new HashMap<DN, List<Integer>>();
+ private final Map<DN, List<CSN>> offlineReplicas =
+ new HashMap<DN, List<CSN>>();
/**
* Sets the generationId for the supplied replication domain.
@@ -83,6 +86,25 @@
}
/**
+ * Adds the following replica information to the offline list.
+ *
+ * @param baseDN
+ * the baseDN of the offline replica
+ * @param offlineCSN
+ * the CSN (serverId + timestamp) of the offline replica
+ */
+ public void addOfflineReplica(DN baseDN, CSN offlineCSN)
+ {
+ List<CSN> offlineCSNs = offlineReplicas.get(baseDN);
+ if (offlineCSNs == null)
+ {
+ offlineCSNs = new LinkedList<CSN>();
+ offlineReplicas.put(baseDN, offlineCSNs);
+ }
+ offlineCSNs.add(offlineCSN);
+ }
+
+ /**
* Returns the Map of domainBaseDN => generationId.
*
* @return a Map of domainBaseDN => generationId
@@ -102,11 +124,22 @@
return domainToServerIds;
}
+ /**
+ * Returns the Map of domainBaseDN => List<offlineCSN>.
+ *
+ * @return a Map of domainBaseDN => List<offlineCSN>.
+ */
+ public Map<DN, List<CSN>> getOfflineReplicas()
+ {
+ return offlineReplicas;
+ }
+
/** {@inheritDoc} */
@Override
public String toString()
{
return "domainToGenerationId=" + domainToGenerationId
- + ", domainToServerIds=" + domainToServerIds;
+ + ", domainToServerIds=" + domainToServerIds
+ + ", offlineReplicas=" + offlineReplicas;
}
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 1d68ab0..9c520ad 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2495,11 +2495,29 @@
* value received, and forwarding the message to the other RSes.
* @param senderHandler The handler for the server that sent the heartbeat.
* @param msg The message to process.
+ * @throws DirectoryException
+ * if a problem occurs
*/
void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
- ChangeTimeHeartbeatMsg msg)
+ ChangeTimeHeartbeatMsg msg) throws DirectoryException
{
- domainDB.replicaHeartbeat(baseDN, msg.getCSN());
+ try
+ {
+ if (msg.isReplicaOfflineMsg())
+ {
+ domainDB.replicaOffline(baseDN, msg.getCSN());
+ }
+ else
+ {
+ domainDB.replicaHeartbeat(baseDN, msg.getCSN());
+ }
+ }
+ catch (ChangelogException e)
+ {
+ throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e
+ .getMessageObject(), e);
+ }
+
if (senderHandler.isDataServer())
{
/*
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
index ead02e3..d83ba85 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -843,14 +843,19 @@
/**
* Processes a change time heartbeat msg.
*
- * @param msg The message to be processed.
+ * @param msg
+ * The message to be processed.
+ * @throws DirectoryException
+ * When an exception is raised.
*/
- void process(ChangeTimeHeartbeatMsg msg)
+ void process(ChangeTimeHeartbeatMsg msg) throws DirectoryException
{
if (logger.isTraceEnabled())
+ {
logger.trace("In "
+ replicationServerDomain.getLocalRSMonitorInstanceName() + " "
+ this + " processes received msg:\n" + msg);
+ }
replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg);
}
@@ -869,9 +874,9 @@
// lets update the LDAP server with out current window size and hope
// that everything will work better in the future.
// TODO also log an error message.
- WindowMsg msg = new WindowMsg(rcvWindow);
- session.publish(msg);
- } else
+ session.publish(new WindowMsg(rcvWindow));
+ }
+ else
{
// Both the LDAP server and the replication server believes that the
// window is closed. Lets check the flowcontrol in case we
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 2f2583e..d5bdf8f 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -217,6 +217,8 @@
* the replication domain baseDN
* @param offlineCSN
* The CSN (serverId and timestamp) for the replica's going offline
+ * @throws ChangelogException
+ * If a database problem happened
*/
- void replicaOffline(DN baseDN, CSN offlineCSN);
+ void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 8dd2061..63af1c3 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -101,7 +101,8 @@
/**
* Holds the last time each replica was seen alive, whether via updates or
- * heartbeats received. Data is held for each serverId cross domain.
+ * heartbeat notifications, or offline notifications. Data is held for each
+ * serverId cross domain.
* <p>
* Updates are persistent and stored in the replicaDBs, heartbeats are
* transient and are easily constructed on normal operations.
@@ -219,6 +220,7 @@
* @return true if the provided baseDN is enabled for the external changelog,
* false if the provided baseDN is disabled for the external changelog
* or unknown to multimaster replication.
+ * @see MultimasterReplication#isECLEnabledDomain(DN)
*/
protected boolean isECLEnabledDomain(DN baseDN)
{
@@ -349,6 +351,20 @@
nextChangeForInsertDBCursor.next();
}
+ for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas()
+ .entrySet())
+ {
+ final DN baseDN = entry.getKey();
+ final List<CSN> offlineCSNs = entry.getValue();
+ for (CSN offlineCSN : offlineCSNs)
+ {
+ if (isECLEnabledDomain(baseDN))
+ {
+ replicasOffline.update(baseDN, offlineCSN);
+ }
+ }
+ }
+
// this will not be used any more. Discard for garbage collection.
this.changelogState = null;
}
@@ -550,20 +566,33 @@
}
private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
- final DN mcBaseDN)
+ final DN mcBaseDN) throws ChangelogException
{
// update, so it becomes the previous cookie for the next change
mediumConsistencyRUV.update(mcBaseDN, mcCSN);
mediumConsistency = Pair.of(mcBaseDN, mcCSN);
- final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId());
- if (offlineCSN != null
- && offlineCSN.isOlderThan(mcCSN)
- // If no new updates has been seen for this replica
- && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN))
+ final int mcServerId = mcCSN.getServerId();
+ final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
+ final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
+ if (offlineCSN != null)
{
- removeCursor(mcBaseDN, mcCSN);
- replicasOffline.removeCSN(mcBaseDN, offlineCSN);
- mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+ if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN))
+ {
+ // replica is back online, we can forget the last time it was offline
+ replicasOffline.removeCSN(mcBaseDN, offlineCSN);
+ }
+ else if (offlineCSN.isOlderThan(mcCSN))
+ {
+ /*
+ * replica is not back online and Medium consistency point has gone past
+ * its last offline time: remove everything known about it: cursor,
+ * offlineCSN from lastAliveCSN and remove all knowledge of this replica
+ * from the medium consistency RUV.
+ */
+ removeCursor(mcBaseDN, mcCSN);
+ lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+ mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+ }
}
}
@@ -583,6 +612,7 @@
}
private void removeCursor(final DN baseDN, final CSN csn)
+ throws ChangelogException
{
for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
: allCursors.entrySet())
@@ -597,6 +627,7 @@
{
iter.remove();
StaticUtils.close(entry2.getValue());
+ resetNextChangeForInsertDBCursor();
return;
}
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 28442d6..73813d7 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -815,13 +815,14 @@
/** {@inheritDoc} */
@Override
public void replicaOffline(DN baseDN, CSN offlineCSN)
+ throws ChangelogException
{
+ dbEnv.addOfflineReplica(baseDN, offlineCSN);
final ChangeNumberIndexer indexer = cnIndexer.get();
if (indexer != null)
{
indexer.replicaOffline(baseDN, offlineCSN);
}
- // TODO save this state in the changelogStateDB?
}
/**
@@ -930,5 +931,13 @@
// wait a bit before purging more
return DEFAULT_SLEEP;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public void initiateShutdown()
+ {
+ super.initiateShutdown();
+ this.interrupt(); // wake up the purger thread for faster shutdown
+ }
}
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index 577fc39..27b0175 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -98,7 +98,11 @@
synchronized (this)
{
closeCursor();
- // previously exhausted cursor must be able to reinitialize themselves
+ // Previously exhausted cursor must be able to reinitialize themselves.
+ // There is a risk of readLock never being unlocked
+ // if following code is called while the cursor is closed.
+ // It is better to let the deadlock happen to help quickly identifying
+ // and fixing such issue with unit tests.
cursor = db.openReadCursor(lastNonNullCurrentCSN);
currentChange = cursor.next();
if (currentChange != null)
@@ -131,7 +135,7 @@
}
/**
- * Called by the Gc when the object is garbage collected Release the internal
+ * Called by the Gc when the object is garbage collected. Release the internal
* cursor in case the cursor was badly used and {@link #close()} was never
* called.
*/
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index e7d27d4..0671963 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -37,6 +37,9 @@
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.opends.server.replication.common.CSN;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -64,6 +67,7 @@
private ReplicationServer replicationServer;
private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private static final String GENERATION_ID_TAG = "GENID";
+ private static final String OFFLINE_TAG = "OFFLINE";
private static final String FIELD_SEPARATOR = " ";
/** The tracer object for the debug logger. */
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
@@ -266,6 +270,19 @@
}
result.setDomainGenerationId(baseDN, generationId);
}
+ else if (prefix.equals(OFFLINE_TAG))
+ {
+ final String[] str = stringKey.split(FIELD_SEPARATOR, 3);
+ final int serverId = toInt(str[1]);
+ final DN baseDN = DN.valueOf(str[2]);
+ long timestamp = ByteString.wrap(entry.getValue()).asReader().getLong();
+ if (logger.isTraceEnabled())
+ {
+ debug("has read replica offline: baseDN=" + baseDN + " serverId="
+ + serverId);
+ }
+ result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId));
+ }
else
{
final String[] str = stringData.split(FIELD_SEPARATOR, 2);
@@ -274,7 +291,8 @@
if (logger.isTraceEnabled())
{
- debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId);
+ debug("has read replica: baseDN=" + baseDN + " serverId="
+ + serverId);
}
result.addServerIdToDomain(serverId, baseDN);
}
@@ -415,7 +433,7 @@
// Opens the DB for the changes received from this server on this domain.
final Database replicaDB = openDatabase(replicaEntry.getKey());
- putInChangelogStateDBIfNotExist(replicaEntry);
+ putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
return replicaDB;
}
@@ -435,7 +453,7 @@
* the replica's serverId
* @return a database entry for the replica
*/
- Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
+ static Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
{
final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
return new SimpleImmutableEntry<String, String>(key, key);
@@ -451,30 +469,65 @@
* the domain's generationId
* @return a database entry for the generationId
*/
- Entry<String, String> toGenIdEntry(DN baseDN, long generationId)
+ static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId)
{
final String normDn = baseDN.toNormalizedString();
final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn;
final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
+ FIELD_SEPARATOR + normDn;
- return new SimpleImmutableEntry<String, String>(key, data);
+ return new SimpleImmutableEntry<byte[], byte[]>(toBytes(key),toBytes(data));
}
- private void putInChangelogStateDBIfNotExist(Entry<String, String> entry)
- throws RuntimeException
+ /**
+ * Converts an Entry<String, String> to an Entry<byte[], byte[]>.
+ *
+ * @param entry
+ * the entry to convert
+ * @return the converted entry
+ */
+ static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry)
{
- DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
+ return new SimpleImmutableEntry<byte[], byte[]>(
+ toBytes(entry.getKey()),
+ toBytes(entry.getValue()));
+ }
+
+ /**
+ * Return an entry to store in the changelog state database representing the
+ * time a replica went offline.
+ *
+ * @param baseDN
+ * the replica's baseDN
+ * @param offlineCSN
+ * the replica's serverId and offline timestamp
+ * @return a database entry representing the time a replica went offline
+ */
+ static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
+ {
+ final byte[] key =
+ toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
+ + FIELD_SEPARATOR + baseDN.toNormalizedString());
+ final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
+ data.append(offlineCSN.getTime());
+ return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
+ }
+
+ private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
+ throws ChangelogException, RuntimeException
+ {
+ DatabaseEntry key = new DatabaseEntry(entry.getKey());
DatabaseEntry data = new DatabaseEntry();
if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
{
Transaction txn = dbEnvironment.beginTransaction(null, null);
try
{
- data.setData(toBytes(entry.getValue()));
+ data.setData(entry.getValue());
if (logger.isTraceEnabled())
{
debug("putting record in the changelogstate Db key=["
- + entry.getKey() + "] value=[" + entry.getValue() + "]");
+ + toString(entry.getKey()) + "] value=["
+ + toString(entry.getValue()) + "]");
}
changelogStateDb.put(txn, key, data);
txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
@@ -584,11 +637,11 @@
*/
public void clearServerId(DN baseDN, int serverId) throws ChangelogException
{
- deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId),
+ deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
"clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
}
- private void deleteFromChangelogStateDB(Entry<String, ?> entry,
+ private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
String methodInvocation) throws ChangelogException
{
if (logger.isTraceEnabled())
@@ -598,7 +651,7 @@
try
{
- final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
+ final DatabaseEntry key = new DatabaseEntry(entry.getKey());
final DatabaseEntry data = new DatabaseEntry();
if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
{
@@ -619,18 +672,65 @@
throw dbe;
}
}
- else
+ else if (logger.isTraceEnabled())
{
- if (logger.isTraceEnabled())
- {
- debug(methodInvocation + " failed: key=[ " + entry.getKey()
- + "] not found");
- }
+ debug(methodInvocation + " failed: key not found");
}
}
- catch (RuntimeException dbe)
+ catch (RuntimeException e)
{
- throw new ChangelogException(dbe);
+ if (logger.isTraceEnabled())
+ {
+ debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
+ }
+ throw new ChangelogException(e);
+ }
+ }
+
+ /**
+ * Add the information about an offline replica to the changelog state DB.
+ *
+ * @param baseDN
+ * the domain of the offline replica
+ * @param offlineCSN
+ * the offline replica serverId and offline timestamp
+ * @throws ChangelogException
+ * if a database problem occurred
+ */
+ public void addOfflineReplica(DN baseDN, CSN offlineCSN)
+ throws ChangelogException
+ {
+ // just overwrite any older entry as it is assumed a newly received offline
+ // CSN is newer than the previous one
+ putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
+ "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+ }
+
+ private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
+ String methodInvocation) throws ChangelogException
+ {
+ if (logger.isTraceEnabled())
+ {
+ debug(methodInvocation + " starting");
+ }
+
+ try
+ {
+ final DatabaseEntry key = new DatabaseEntry(entry.getKey());
+ final DatabaseEntry data = new DatabaseEntry(entry.getValue());
+ changelogStateDb.put(null, key, data);
+ if (logger.isTraceEnabled())
+ {
+ debug(methodInvocation + " succeeded");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ if (logger.isTraceEnabled())
+ {
+ debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
+ }
+ throw new ChangelogException(e);
}
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
index 15a784b..f3f74c5 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -34,7 +34,6 @@
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.util.StaticUtils;
-import org.opends.server.util.TimeThread;
/**
* This thread publishes a {@link ChangeTimeHeartbeatMsg} on a given protocol
@@ -70,15 +69,15 @@
* @param session The session on which heartbeats are to be sent.
* @param heartbeatInterval The interval between heartbeats sent
* (in milliseconds).
- * @param serverId2 The serverId of the sender domain.
+ * @param serverId The serverId of the sender domain.
*/
public CTHeartbeatPublisherThread(String threadName, Session session,
- long heartbeatInterval, int serverId2)
+ long heartbeatInterval, int serverId)
{
super(threadName);
this.session = session;
this.heartbeatInterval = heartbeatInterval;
- this.serverId = serverId2;
+ this.serverId = serverId;
}
/**
@@ -87,6 +86,7 @@
@Override
public void run()
{
+ long lastHeartbeatTime = 0;
try
{
if (logger.isTraceEnabled())
@@ -97,13 +97,12 @@
while (!shutdown)
{
- long now = System.currentTimeMillis();
- final CSN csn = new CSN(TimeThread.getTime(), 0, serverId);
- ChangeTimeHeartbeatMsg ctHeartbeatMsg = new ChangeTimeHeartbeatMsg(csn);
-
+ final long now = System.currentTimeMillis();
if (now > session.getLastPublishTime() + heartbeatInterval)
{
- session.publish(ctHeartbeatMsg);
+ final CSN csn = new CSN(now, 0, serverId);
+ session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn));
+ lastHeartbeatTime = csn.getTime();
}
long sleepTime = session.getLastPublishTime() + heartbeatInterval - now;
@@ -129,6 +128,26 @@
}
}
}
+
+ if (shutdown)
+ {
+ /*
+ * Shortcoming: this thread is restarted each time the DS reconnects,
+ * e.g. during load balancing. This is not that much of a problem
+ * because the ChangeNumberIndexer tolerates receiving replica offline
+ * heartbeats and then receiving messages back again.
+ */
+ /*
+ * However, during shutdown we need to be sure that all pending client
+ * operations have either completed or have been aborted before shutting
+ * down replication. Otherwise, the medium consistency will move forward
+ * without knowing about these changes.
+ */
+ final long now = System.currentTimeMillis();
+ final int seqNum = lastHeartbeatTime == now ? 1 : 0;
+ final CSN offlineCSN = new CSN(now, seqNum, serverId);
+ session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN));
+ }
}
catch (IOException e)
{
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java
new file mode 100644
index 0000000..a05b230
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java
@@ -0,0 +1,83 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.protocol;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.replication.common.CSN;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Test for {@link ByteArrayBuilder} and {@link ByteArrayScanner} classes.
+ */
+@SuppressWarnings("javadoc")
+public class ByteArrayTest extends DirectoryServerTestCase
+{
+
+ @Test
+ public void testBuilderAppendMethodsAndScannerNextMethods() throws Exception
+ {
+ final boolean bo = true;
+ final byte by = 80;
+ final short sh = 42;
+ final int i = sh + 1;
+ final long l = i + 1;
+ final String st = "Yay!";
+ final Collection<String> col = Arrays.asList("foo", "bar", "baz");
+ final CSN csn = new CSN(42424242, 13, 42);
+
+ byte[] bytes = new ByteArrayBuilder()
+ .append(bo)
+ .append(by)
+ .append(sh)
+ .append(i)
+ .append(l)
+ .append(st)
+ .appendStrings(col)
+ .appendUTF8(i)
+ .appendUTF8(l)
+ .append(csn)
+ .appendUTF8(csn)
+ .toByteArray();
+
+ final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
+ Assert.assertEquals(scanner.nextBoolean(), bo);
+ Assert.assertEquals(scanner.nextByte(), by);
+ Assert.assertEquals(scanner.nextShort(), sh);
+ Assert.assertEquals(scanner.nextInt(), i);
+ Assert.assertEquals(scanner.nextLong(), l);
+ Assert.assertEquals(scanner.nextString(), st);
+ Assert.assertEquals(scanner.nextStrings(new ArrayList<String>()), col);
+ Assert.assertEquals(scanner.nextIntUTF8(), i);
+ Assert.assertEquals(scanner.nextLongUTF8(), l);
+ Assert.assertEquals(scanner.nextCSN(), csn);
+ Assert.assertEquals(scanner.nextCSNUTF8(), csn);
+ Assert.assertTrue(scanner.isEmpty());
+ }
+}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index c748d51..9ff958a 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -52,6 +52,7 @@
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
+import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
@@ -108,10 +109,8 @@
List<Modification> mods4 = new ArrayList<Modification>();
for (int i = 0; i < 10; i++)
{
- Attribute attr = Attributes.create("description", "string"
- + String.valueOf(i));
- Modification mod = new Modification(ModificationType.ADD, attr);
- mods4.add(mod);
+ Attribute attr = Attributes.create("description", "string" + i);
+ mods4.add(new Modification(ModificationType.ADD, attr));
}
Attribute attr5 = Attributes.create("namingcontexts", TEST_ROOT_DN_STRING);
@@ -157,14 +156,14 @@
msg.setAssuredMode(assuredMode);
msg.setSafeDataLevel(safeDataLevel);
- // Set ECL entry inlcuded attributes
+ // Set ECL entry included attributes
if (entryAttrList != null)
{
msg.setEclIncludes(entryAttrList);
}
ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
- msg.getBytes(), ProtocolVersion.getCurrentVersion());
+ msg.getBytes(), getCurrentVersion());
// Test that generated attributes match original attributes.
assertEquals(generatedMsg.isAssured(), isAssured);
@@ -219,7 +218,7 @@
// Check equals
ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
- msg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
+ msg.getBytes(), REPLICATION_PROTOCOL_V1);
assertFalse(msg.equals(null));
assertFalse(msg.equals(new Object()));
@@ -303,7 +302,7 @@
}
msg.setInitiatorsName("johnny h");
DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg.generateMsg(
- msg.getBytes(), ProtocolVersion.getCurrentVersion());
+ msg.getBytes(), getCurrentVersion());
assertEquals(msg.toString(), generatedMsg.toString());
assertEquals(msg.getInitiatorsName(), generatedMsg.getInitiatorsName());
@@ -398,8 +397,8 @@
msg.setEclIncludes(entryAttrList);
}
- ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg
- .generateMsg(msg.getBytes(), ProtocolVersion.getCurrentVersion());
+ ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg.generateMsg(
+ msg.getBytes(), getCurrentVersion());
// Test that generated attributes match original attributes.
assertEquals(generatedMsg.isAssured(), isAssured);
@@ -477,8 +476,8 @@
msg.setEclIncludes(entryAttrList);
}
- AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(msg
- .getBytes(), ProtocolVersion.getCurrentVersion());
+ AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(
+ msg.getBytes(), getCurrentVersion());
assertEquals(generatedMsg.getBytes(), msg.getBytes());
assertEquals(generatedMsg.toString(), msg.toString());
@@ -829,6 +828,33 @@
new StopMsg(msg.getBytes(getCurrentVersion()));
}
+ @Test
+ public void changeTimeHeartbeatMsgTest() throws Exception
+ {
+ final short v1 = REPLICATION_PROTOCOL_V1;
+ final short v7 = REPLICATION_PROTOCOL_V7;
+ final short v8 = REPLICATION_PROTOCOL_V8;
+
+ final CSN csn = new CSN(System.currentTimeMillis(), 0, 42);
+ final ChangeTimeHeartbeatMsg heartbeatMsg = ChangeTimeHeartbeatMsg.heartbeatMsg(csn);
+ assertCTHearbeatMsg(heartbeatMsg, v1, false);
+ assertCTHearbeatMsg(heartbeatMsg, v7, false);
+ assertCTHearbeatMsg(heartbeatMsg, v8, false);
+
+ final ChangeTimeHeartbeatMsg offlineMsg = ChangeTimeHeartbeatMsg.replicaOfflineMsg(csn);
+ assertCTHearbeatMsg(offlineMsg, v1, false);
+ assertCTHearbeatMsg(offlineMsg, v7, false);
+ assertCTHearbeatMsg(offlineMsg, v8, true);
+ }
+
+ private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg heartbeatMsg,
+ short version, boolean expected) throws DataFormatException
+ {
+ final byte[] bytes = heartbeatMsg.getBytes(version);
+ ChangeTimeHeartbeatMsg decodedMsg = new ChangeTimeHeartbeatMsg(bytes, version);
+ assertEquals(decodedMsg.isReplicaOfflineMsg(), expected);
+ }
+
/**
* Test that WindowMsg encoding and decoding works
* by checking that : msg == new WindowMsg(msg.getBytes()).
@@ -914,8 +940,7 @@
throws Exception
{
TopologyMsg msg = new TopologyMsg(dsList, rsList);
- TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()),
- ProtocolVersion.getCurrentVersion());
+ TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()), getCurrentVersion());
assertEquals(msg.getReplicaInfos(), newMsg.getReplicaInfos());
assertEquals(msg.getRsInfos(), newMsg.getRsInfos());
}
@@ -1092,16 +1117,14 @@
msg.setServerState(sid3, s3, now+3, false);
byte[] b = msg.getBytes(getCurrentVersion());
- MonitorMsg newMsg = new MonitorMsg(b, ProtocolVersion.getCurrentVersion());
+ MonitorMsg newMsg = new MonitorMsg(b, getCurrentVersion());
assertEquals(rsState, msg.getReplServerDbState());
assertEquals(newMsg.getReplServerDbState().toString(),
msg.getReplServerDbState().toString());
- Iterator<Integer> it = newMsg.ldapIterator();
- while (it.hasNext())
+ for (int sid : toIterable(newMsg.ldapIterator()))
{
- int sid = it.next();
ServerState s = newMsg.getLDAPServerState(sid);
if (sid == sid1)
{
@@ -1119,10 +1142,8 @@
}
}
- Iterator<Integer> it2 = newMsg.rsIterator();
- while (it2.hasNext())
+ for (int sid : toIterable(newMsg.rsIterator()))
{
- int sid = it2.next();
ServerState s = newMsg.getRSServerState(sid);
if (sid == sid3)
{
@@ -1381,7 +1402,7 @@
encodemsg += (t4 - t31);
// getBytes
- byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion());
+ byte[] bytes = generatedMsg.getBytes(getCurrentVersion());
t5 = System.nanoTime();
getbytes += (t5 - t4);
@@ -1456,7 +1477,7 @@
encodemsg += (t4 - t31);
// getBytes
- byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion());
+ byte[] bytes = generatedMsg.getBytes(getCurrentVersion());
t5 = System.nanoTime();
getbytes += (t5 - t4);
@@ -1530,7 +1551,7 @@
encodemsg += (t4 - t31);
// getBytes
- byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion());
+ byte[] bytes = generatedMsg.getBytes(getCurrentVersion());
t5 = System.nanoTime();
getbytes += (t5 - t4);
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 0236b98..e6939dd 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -388,9 +388,18 @@
final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
publishUpdateMsg(msg4);
- // MCP moved forward after receiving update from serverId1
+ // MCP moves forward after receiving update from serverId1
// (last replica in the domain)
assertExternalChangelogContent(msg1, msg2, msg4);
+
+ // serverId2 comes online again
+ final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5);
+ publishUpdateMsg(msg5);
+ // MCP does not move until it knows what happens to serverId1
+ assertExternalChangelogContent(msg1, msg2, msg4);
+ sendHeartbeat(BASE_DN1, serverId1, 6);
+ // MCP moves forward
+ assertExternalChangelogContent(msg1, msg2, msg4, msg5);
}
private void addReplica(DN baseDN, int serverId) throws Exception
@@ -418,11 +427,13 @@
waitForWaitingState(cnIndexer);
}
- private void stopCNIndexer()
+ private void stopCNIndexer() throws Exception
{
if (cnIndexer != null)
{
cnIndexer.initiateShutdown();
+ cnIndexer.interrupt();
+ cnIndexer.join();
}
}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
index b051c68..972c1c4 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
@@ -31,6 +31,7 @@
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.common.CSN;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DN;
@@ -43,6 +44,7 @@
import com.sleepycat.je.Environment;
import static java.util.Arrays.*;
+import static java.util.Collections.*;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.je.ReplicationDbEnv.*;
@@ -51,78 +53,103 @@
public class ReplicationDbEnvTest extends DirectoryServerTestCase
{
- /**
- * Bypass heavyweight setup.
- */
- private final class TestableReplicationDbEnv extends ReplicationDbEnv
- {
- private TestableReplicationDbEnv() throws ChangelogException
- {
- super(null, null);
- }
+ /**
+ * Bypass heavyweight setup.
+ */
+ private final class TestableReplicationDbEnv extends ReplicationDbEnv
+ {
+ private TestableReplicationDbEnv() throws ChangelogException
+ {
+ super(null, null);
+ }
- @Override
- protected Environment openJEEnvironment(String path)
- {
- return null;
- }
+ @Override
+ protected Environment openJEEnvironment(String path)
+ {
+ return null;
+ }
- @Override
- protected Database openDatabase(String databaseName)
- throws ChangelogException, RuntimeException
- {
- return null;
- }
- }
+ @Override
+ protected Database openDatabase(String databaseName)
+ throws ChangelogException, RuntimeException
+ {
+ return null;
+ }
+ }
- @BeforeClass
- public void setup() throws Exception
- {
- TestCaseUtils.startFakeServer();
- }
+ @BeforeClass
+ public void setup() throws Exception
+ {
+ TestCaseUtils.startFakeServer();
+ }
- @AfterClass
- public void teardown()
- {
- TestCaseUtils.shutdownFakeServer();
- }
+ @AfterClass
+ public void teardown()
+ {
+ TestCaseUtils.shutdownFakeServer();
+ }
- @DataProvider
- public Object[][] changelogStateDataProvider() throws Exception
- {
- return new Object[][] {
- { DN.valueOf("dc=example,dc=com"), 524157415, asList(42, 346) },
- // test with a space in the baseDN (space is the field separator in the DB)
- { DN.valueOf("cn=admin data"), 524157415, asList(42, 346) },
- };
- }
+ @DataProvider
+ public Object[][] changelogStateDataProvider() throws Exception
+ {
+ final int genId = 524157415;
+ final int id1 = 42;
+ final int id2 = 346;
+ final int t1 = 1956245524;
+ return new Object[][] {
+ { DN.valueOf("dc=example,dc=com"), genId, EMPTY_LIST, EMPTY_LIST },
+ { DN.valueOf("dc=example,dc=com"), genId, asList(id1, id2),
+ asList(new CSN(id2, 0, t1)) },
+ // test with a space in the baseDN (space is the field separator in the DB)
+ { DN.valueOf("cn=admin data"), genId, asList(id1, id2), EMPTY_LIST }, };
+ }
- @Test(dataProvider = "changelogStateDataProvider")
- public void encodeDecodeChangelogState(DN baseDN, long generationId,
- List<Integer> serverIds) throws Exception
- {
- final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv();
+ @Test(dataProvider = "changelogStateDataProvider")
+ public void encodeDecodeChangelogState(DN baseDN, long generationId,
+ List<Integer> replicas, List<CSN> offlineReplicas) throws Exception
+ {
+ final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv();
- // encode data
- final Map<byte[], byte[]> wholeState = new LinkedHashMap<byte[], byte[]>();
- put(wholeState, changelogStateDB.toGenIdEntry(baseDN, generationId));
- for (Integer serverId : serverIds)
- {
- put(wholeState, changelogStateDB.toReplicaEntry(baseDN, serverId));
- }
+ // encode data
+ final Map<byte[], byte[]> wholeState = new LinkedHashMap<byte[], byte[]>();
+ put(wholeState, toGenIdEntry(baseDN, generationId));
+ for (Integer serverId : replicas)
+ {
+ put(wholeState, toByteArray(toReplicaEntry(baseDN, serverId)));
+ }
+ for (CSN offlineCSN : offlineReplicas)
+ {
+ put(wholeState, toReplicaOfflineEntry(baseDN, offlineCSN));
+ }
- // decode data
- final ChangelogState state =
- changelogStateDB.decodeChangelogState(wholeState);
- assertThat(state.getDomainToGenerationId()).containsExactly(
- entry(baseDN, generationId));
- assertThat(state.getDomainToServerIds()).containsExactly(
- entry(baseDN, serverIds));
- }
+ // decode data
+ final ChangelogState state =
+ changelogStateDB.decodeChangelogState(wholeState);
+ assertThat(state.getDomainToGenerationId()).containsExactly(
+ entry(baseDN, generationId));
+ if (!replicas.isEmpty())
+ {
+ assertThat(state.getDomainToServerIds()).containsExactly(
+ entry(baseDN, replicas));
+ }
+ else
+ {
+ assertThat(state.getDomainToServerIds()).isEmpty();
+ }
+ if (!offlineReplicas.isEmpty())
+ {
+ assertThat(state.getOfflineReplicas()).containsExactly(
+ entry(baseDN, offlineReplicas));
+ }
+ else
+ {
+ assertThat(state.getOfflineReplicas()).isEmpty();
+ }
+ }
- private void put(Map<byte[], byte[]> map, Entry<String, String> entry)
- {
- map.put(toBytes(entry.getKey()), toBytes(entry.getValue()));
- }
+ private void put(Map<byte[], byte[]> map, Entry<byte[], byte[]> entry)
+ {
+ map.put(entry.getKey(), entry.getValue());
+ }
}
--
Gitblit v1.10.0