OPENDJ-1259 (CR-3443) Make the Medium Consistency Point support replicas temporarily leaving the topology
Changed ChangeTimeHeartbeatMsg to be able to send timestamps for a replica stopping. This is done by adding an event type to cover both normal heartbeat and temporarily offline replica.
This allows RSs to compute medium consistency accurately by using the stop time from the originating server.
Because of this change, incremented protocol version to v8.
ProtocolVersion.java:
Added REPLICATION_PROTOCOL_V8.
Fixed javadocs by using lists.
ByteArrayBuilder.java, ByteArrayScanner.java, ByteArrayTest.java: ADDED
ChangeTimeHeartbeatMsg.java:
Added eventType field + added isReplicaOfflineMsg().
Made ctor private + added factory methods heartbeatMsg() and replicaOfflineMsg().
Used ByteArrayBuilder and ByteArrayScanner.
CTHeartbeatPublisherThread.java:
In run(), on shutdown, handle sending a replica offline message.
SynchronizationMsgTest.java:
Added changeTimeHeartbeatMsgTest() and assertCTHearbeatMsg().
Code cleanup + used static import for ProtocolVersion.
DataServerHandler.java:
Consequence of the change to StopMsg ctor.
Extracted method publishStopMsg() for increased readability.
ReplicationServerDomain.java:
In processChangeTimeHeartbeatMsg(), added support for offline replica messages + declared DirectoryException checked exception.
ServerHandler.java:
Consequence of the change to ReplicationServerDomain.processChangeTimeHeartbeatMsg() checked exception.
ChangeNumberIndexer.java:
In initialize(), handle offline replicas.
In moveForwardMediumConsistencyPoint(), now throw ChangelogException.
In removeCursor(), now call resetNextChangeForInsertDBCursor().
ChangeNumberIndexerTest.java:
Expanded the emptyDBTwoDSsOneGoingOffline() test.
JEChangelogDB.java, ReplicationDomainDB.java:
In replicaOffline(), now throw ChangelogException and call ReplicationDbEnv.addOfflineReplica()
Added ChangelogDBPurger.initiateShutdown().
ReplicationDbEnv.java:
Added OFFLINE_TAG constant, toReplicaOfflineEntry(), addOfflineReplica() and putInChangelogStateDB(), toByteArray()().
In decodeChangelogState(), handle offline replica information.
Changed several method signature to use Entry<byte[], byte[]> instead of Entry<String, String>.
ReplicationDbEnvTest.java:
Fixed indentation + added tests.
ChangelogState.java:
Added offlineReplicas field + addOfflineReplica() and getOfflineReplicas() + changed toString() to display it.
StopMsg.java:
Added toString().
JEReplicaDBCursor.java:
Improved comments and javadocs.
3 files added
15 files modified
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.Collection; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | |
| | | /** |
| | | * Byte array builder class encodes data into byte arrays to send messages over |
| | | * the replication protocol. Built on top of {@link ByteStringBuilder}, it |
| | | * isolates the latter against legacy type conversions from the replication |
| | | * protocol. It exposes a fluent API. |
| | | * |
| | | * @see ByteArrayScanner ByteArrayScanner class that decodes messages built with |
| | | * current class. |
| | | */ |
| | | public class ByteArrayBuilder |
| | | { |
| | | |
| | | /** This is the null byte, also known as zero byte. */ |
| | | public static final byte NULL_BYTE = 0; |
| | | private final ByteStringBuilder builder; |
| | | |
| | | /** |
| | | * Constructs a ByteArrayBuilder. |
| | | */ |
| | | public ByteArrayBuilder() |
| | | { |
| | | builder = new ByteStringBuilder(); |
| | | } |
| | | |
| | | /** |
| | | * Constructs a ByteArrayBuilder. |
| | | * |
| | | * @param capacity |
| | | * the capacity of the underlying ByteStringBuilder |
| | | */ |
| | | public ByteArrayBuilder(int capacity) |
| | | { |
| | | builder = new ByteStringBuilder(capacity); |
| | | } |
| | | |
| | | /** |
| | | * Append a boolean to this ByteArrayBuilder. |
| | | * |
| | | * @param b |
| | | * the boolean to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(boolean b) |
| | | { |
| | | append((byte) (b ? 1 : 0)); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a byte to this ByteArrayBuilder. |
| | | * |
| | | * @param b |
| | | * the byte to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(byte b) |
| | | { |
| | | builder.append(b); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a short to this ByteArrayBuilder. |
| | | * |
| | | * @param s |
| | | * the short to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(short s) |
| | | { |
| | | builder.append(s); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append an int to this ByteArrayBuilder. |
| | | * |
| | | * @param i |
| | | * the long to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(int i) |
| | | { |
| | | builder.append(i); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a long to this ByteArrayBuilder. |
| | | * |
| | | * @param l |
| | | * the long to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(long l) |
| | | { |
| | | builder.append(l); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append an int to this ByteArrayBuilder by converting it to a String then |
| | | * encoding that string to a UTF-8 byte array. |
| | | * |
| | | * @param i |
| | | * the int to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder appendUTF8(int i) |
| | | { |
| | | return append(Integer.toString(i)); |
| | | } |
| | | |
| | | /** |
| | | * Append a long to this ByteArrayBuilder by converting it to a String then |
| | | * encoding that string to a UTF-8 byte array. |
| | | * |
| | | * @param l |
| | | * the long to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder appendUTF8(long l) |
| | | { |
| | | return append(Long.toString(l)); |
| | | } |
| | | |
| | | /** |
| | | * Append a Collection of Strings to this ByteArrayBuilder. |
| | | * |
| | | * @param col |
| | | * the Collection of Strings to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder appendStrings(Collection<String> col) |
| | | { |
| | | append(col.size()); |
| | | for (String s : col) |
| | | { |
| | | append(s); |
| | | } |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a String to this ByteArrayBuilder. |
| | | * |
| | | * @param s |
| | | * the String to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(String s) |
| | | { |
| | | try |
| | | { |
| | | append(s.getBytes("UTF-8")); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new RuntimeException("Should never happen", e); |
| | | } |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a CSN to this ByteArrayBuilder. |
| | | * |
| | | * @param csn |
| | | * the CSN to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(CSN csn) |
| | | { |
| | | csn.toByteString(builder); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a CSN to this ByteArrayBuilder by converting it to a String then |
| | | * encoding that string to a UTF-8 byte array. |
| | | * |
| | | * @param csn |
| | | * the CSN to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder appendUTF8(CSN csn) |
| | | { |
| | | append(csn.toString()); |
| | | return this; |
| | | } |
| | | |
| | | private ByteArrayBuilder append(byte[] sBytes) |
| | | { |
| | | for (byte b : sBytes) |
| | | { |
| | | append(b); |
| | | } |
| | | append((byte) 0); // zero separator |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Converts the content of this ByteStringBuilder to a byte array. |
| | | * |
| | | * @return the content of this ByteStringBuilder converted to a byte array. |
| | | */ |
| | | public byte[] toByteArray() |
| | | { |
| | | return builder.toByteArray(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return builder.toString(); |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * boolean fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of boolean fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended boolean fields. |
| | | */ |
| | | public static int booleans(int nbFields) |
| | | { |
| | | return nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * byte fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of byte fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended byte fields. |
| | | */ |
| | | public static int bytes(int nbFields) |
| | | { |
| | | return nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * short fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of short fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended short fields. |
| | | */ |
| | | public static int shorts(int nbFields) |
| | | { |
| | | return 2 * nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * int fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of int fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended int fields. |
| | | */ |
| | | public static int ints(int nbFields) |
| | | { |
| | | return 4 * nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * long fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of long fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended long fields. |
| | | */ |
| | | public static int longs(int nbFields) |
| | | { |
| | | return 8 * nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * CSN fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of CSN fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended CSN fields. |
| | | */ |
| | | public static int csns(int nbFields) |
| | | { |
| | | return CSN.BYTE_ENCODING_LENGTH * nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * CSN fields encoded as a UTF8 string when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of CSN fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended legacy-encoded CSN |
| | | * fields. |
| | | */ |
| | | public static int csnsUTF8(int nbFields) |
| | | { |
| | | return CSN.STRING_ENCODING_LENGTH * nbFields + 1 /* null byte */; |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.util.Collection; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.types.ByteSequenceReader; |
| | | import org.opends.server.types.ByteString; |
| | | |
| | | /** |
| | | * Byte array scanner class helps decode data from byte arrays received via |
| | | * messages over the replication protocol. Built on top of |
| | | * {@link ByteSequenceReader}, it isolates the latter against legacy type |
| | | * conversions from the replication protocol. |
| | | * |
| | | * @see ByteArrayBuilder ByteArrayBuilder class that encodes messages read with |
| | | * current class. |
| | | */ |
| | | public class ByteArrayScanner |
| | | { |
| | | |
| | | private final ByteSequenceReader bytes; |
| | | |
| | | /** |
| | | * Builds a ByteArrayScanner object that will read from the supplied byte |
| | | * array. |
| | | * |
| | | * @param bytes |
| | | * the byte array input that will be read from |
| | | */ |
| | | public ByteArrayScanner(byte[] bytes) |
| | | { |
| | | this.bytes = ByteString.wrap(bytes).asReader(); |
| | | } |
| | | |
| | | /** |
| | | * Reads the next boolean. |
| | | * |
| | | * @return the next boolean |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public boolean nextBoolean() throws DataFormatException |
| | | { |
| | | return nextByte() != 0; |
| | | } |
| | | |
| | | /** |
| | | * Reads the next byte. |
| | | * |
| | | * @return the next byte |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public byte nextByte() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return bytes.get(); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the next short. |
| | | * |
| | | * @return the next short |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public short nextShort() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return bytes.getShort(); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the next int. |
| | | * |
| | | * @return the next int |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public int nextInt() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return bytes.getInt(); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the next long. |
| | | * |
| | | * @return the next long |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public long nextLong() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return bytes.getLong(); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the next int that was encoded as a UTF8 string. |
| | | * |
| | | * @return the next int that was encoded as a UTF8 string. |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public int nextIntUTF8() throws DataFormatException |
| | | { |
| | | return Integer.valueOf(nextString()); |
| | | } |
| | | |
| | | /** |
| | | * Reads the next long that was encoded as a UTF8 string. |
| | | * |
| | | * @return the next long that was encoded as a UTF8 string. |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public long nextLongUTF8() throws DataFormatException |
| | | { |
| | | return Long.valueOf(nextString()); |
| | | } |
| | | |
| | | /** |
| | | * Reads the next UTF8-encoded string. |
| | | * |
| | | * @return the next UTF8-encoded string. |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public String nextString() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | final String s = bytes.getString(findZeroSeparator()); |
| | | bytes.skip(1); // skip the zero separator |
| | | return s; |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | private int findZeroSeparator() throws DataFormatException |
| | | { |
| | | int offset = 0; |
| | | final int remaining = bytes.remaining(); |
| | | while (bytes.peek(offset) != 0 && offset < remaining) |
| | | { |
| | | offset++; |
| | | } |
| | | if (offset == remaining) |
| | | { |
| | | throw new DataFormatException("No more data to read from"); |
| | | } |
| | | return offset; |
| | | } |
| | | |
| | | /** |
| | | * Reads the next UTF8-encoded strings in the provided collection. |
| | | * |
| | | * @param output |
| | | * the collection where to add the next UTF8-encoded strings |
| | | * @param <TCol> |
| | | * the collection's concrete type |
| | | * @return the provided collection where the next UTF8-encoded strings have |
| | | * been added. |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public <TCol extends Collection<String>> TCol nextStrings(TCol output) |
| | | throws DataFormatException |
| | | { |
| | | final int colSize = nextInt(); |
| | | for (int i = 0; i < colSize; i++) |
| | | { |
| | | output.add(nextString()); |
| | | } |
| | | return output; |
| | | } |
| | | |
| | | /** |
| | | * Reads the next CSN. |
| | | * |
| | | * @return the next CSN. |
| | | * @throws DataFormatException |
| | | * if CSN was incorrectly encoded or no more data can be read from |
| | | * the input |
| | | */ |
| | | public CSN nextCSN() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return CSN.valueOf(bytes.getByteSequence(CSN.BYTE_ENCODING_LENGTH)); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the next CSN that was encoded as a UTF8 string. |
| | | * |
| | | * @return the next CSN that was encoded as a UTF8 string. |
| | | * @throws DataFormatException |
| | | * if legacy CSN was incorrectly encoded or no more data can be read |
| | | * from the input |
| | | */ |
| | | public CSN nextCSNUTF8() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return CSN.valueOf(nextString()); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns whether the scanner has more bytes to consume. |
| | | * |
| | | * @return true if the scanner has more bytes to consume, false otherwise. |
| | | */ |
| | | public boolean isEmpty() |
| | | { |
| | | return bytes.remaining() == 0; |
| | | } |
| | | |
| | | } |
| | |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.types.ByteSequenceReader; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | |
| | | import static org.opends.server.replication.protocol.ByteArrayBuilder.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | |
| | | /** |
| | | * Class that define messages sent by a replication domain (DS) to the |
| | |
| | | */ |
| | | public class ChangeTimeHeartbeatMsg extends ReplicationMsg |
| | | { |
| | | private static final byte NORMAL_HEARTBEAT = 0; |
| | | private static final byte REPLICA_OFFLINE_HEARTBEAT = 1; |
| | | |
| | | /** |
| | | * The CSN containing the change time. |
| | | */ |
| | | private final CSN csn; |
| | | /** |
| | | * The CSN containing the change time. |
| | | */ |
| | | private final byte eventType; |
| | | |
| | | private ChangeTimeHeartbeatMsg(CSN csn, byte eventType) |
| | | { |
| | | this.csn = csn; |
| | | this.eventType = eventType; |
| | | } |
| | | |
| | | /** |
| | | * Constructor of a Change Time Heartbeat message providing the change time |
| | | * value in a CSN. |
| | | * Factory method that builds a change time heartbeat message providing the |
| | | * change time value in a CSN. |
| | | * |
| | | * @param csn |
| | | * The provided CSN. |
| | | * @return a new ChangeTimeHeartbeatMsg |
| | | */ |
| | | public ChangeTimeHeartbeatMsg(CSN csn) |
| | | public static ChangeTimeHeartbeatMsg heartbeatMsg(CSN csn) |
| | | { |
| | | this.csn = csn; |
| | | return new ChangeTimeHeartbeatMsg(csn, NORMAL_HEARTBEAT); |
| | | } |
| | | |
| | | /** |
| | | * Factory method that builds a change time heartbeat message for a replica |
| | | * going offline. |
| | | * |
| | | * @param offlineCSN |
| | | * the serverId and timestamp of the replica going offline |
| | | * @return a new ChangeTimeHeartbeatMsg |
| | | */ |
| | | public static ChangeTimeHeartbeatMsg replicaOfflineMsg(CSN offlineCSN) |
| | | { |
| | | return new ChangeTimeHeartbeatMsg(offlineCSN, REPLICA_OFFLINE_HEARTBEAT); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns whether this is a replica offline message. |
| | | * |
| | | * @return true if this is a replica offline message, false if this is a |
| | | * regular heartbeat message. |
| | | */ |
| | | public boolean isReplicaOfflineMsg() |
| | | { |
| | | return eventType == REPLICA_OFFLINE_HEARTBEAT; |
| | | } |
| | | |
| | | /** |
| | | * Creates a message from a provided byte array. |
| | | * |
| | | * @param in |
| | |
| | | public ChangeTimeHeartbeatMsg(byte[] in, short version) |
| | | throws DataFormatException |
| | | { |
| | | final ByteSequenceReader reader = ByteString.wrap(in).asReader(); |
| | | try |
| | | { |
| | | if (reader.get() != MSG_TYPE_CT_HEARTBEAT) |
| | | final ByteArrayScanner scanner = new ByteArrayScanner(in); |
| | | final byte msgType = scanner.nextByte(); |
| | | if (msgType != MSG_TYPE_CT_HEARTBEAT) |
| | | { |
| | | // Throw better exception below. |
| | | throw new IllegalArgumentException(); |
| | | throw new DataFormatException("input is not a valid " |
| | | + getClass().getSimpleName() + " message: " + msgType); |
| | | } |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7) |
| | | { |
| | | csn = CSN.valueOf(reader.getByteSequence(CSN.BYTE_ENCODING_LENGTH)); |
| | | } |
| | | else |
| | | { |
| | | csn = CSN.valueOf(reader.getString(CSN.STRING_ENCODING_LENGTH)); |
| | | reader.get(); // Read trailing 0 byte. |
| | | } |
| | | csn = version >= REPLICATION_PROTOCOL_V7 |
| | | ? scanner.nextCSN() |
| | | : scanner.nextCSNUTF8(); |
| | | eventType = version >= REPLICATION_PROTOCOL_V8 |
| | | ? scanner.nextByte() |
| | | : NORMAL_HEARTBEAT; |
| | | |
| | | if (reader.remaining() > 0) |
| | | if (!scanner.isEmpty()) |
| | | { |
| | | // Throw better exception below. |
| | | throw new IllegalArgumentException(); |
| | | throw new DataFormatException( |
| | | "Did not expect to find more bytes to read for " |
| | | + getClass().getSimpleName() + " message."); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | catch (RuntimeException e) |
| | | { |
| | | // Index out of bounds, bad format, etc. |
| | | throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg"); |
| | |
| | | @Override |
| | | public byte[] getBytes(short protocolVersion) |
| | | { |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7) |
| | | if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V7) |
| | | { |
| | | final ByteStringBuilder builder = new ByteStringBuilder( |
| | | CSN.BYTE_ENCODING_LENGTH + 1 /* type + csn */); |
| | | ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csnsUTF8(1)); |
| | | builder.append(MSG_TYPE_CT_HEARTBEAT); |
| | | csn.toByteString(builder); |
| | | builder.appendUTF8(csn); |
| | | return builder.toByteArray(); |
| | | } |
| | | else |
| | | { |
| | | final ByteStringBuilder builder = new ByteStringBuilder( |
| | | CSN.STRING_ENCODING_LENGTH + 2 /* type + csn str + nul */); |
| | | |
| | | final ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csns(1)); |
| | | builder.append(MSG_TYPE_CT_HEARTBEAT); |
| | | builder.append(csn.toString()); |
| | | builder.append((byte) 0); // For compatibility with earlier protocol |
| | | // versions. |
| | | return builder.toByteArray(); |
| | | builder.append(csn); |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V8) |
| | | { |
| | | builder.append(eventType); |
| | | } |
| | | return builder.toByteArray(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | { |
| | | return getClass().getSimpleName() + ", csn=" + csn.toStringUI(); |
| | | } |
| | | |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | |
| | | /** |
| | | * The version utility class for the replication protocol. |
| | | */ |
| | |
| | | public static final short REPLICATION_PROTOCOL_V1_REAL = 49; |
| | | /** |
| | | * The constant for the second version of the replication protocol. |
| | | * Add fields in the header for assured replication. |
| | | * <ul> |
| | | * <li>Add fields in the header for assured replication.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V2 = 2; |
| | | |
| | | /** |
| | | * The constant for the 3rd version of the replication protocol. |
| | | * Add messages for remote ECL : not used as of today. |
| | | * <ul> |
| | | * <li>Add messages for remote ECL : not used as of today.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V3 = 3; |
| | | |
| | | /** |
| | | * The constant for the 4th version of the replication protocol. |
| | | * - Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for |
| | | * ECL entry attributes. |
| | | * - Modified algorithm for choosing a RS to connect to: introduction of a |
| | | * ReplicationServerDSMsg message. |
| | | * -> also added of the server URL in RSInfo of TopologyMsg |
| | | * - Introduction of a StopMsg for proper connections ending. |
| | | * - Initialization failover/flow control |
| | | * <ul> |
| | | * <li>Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for |
| | | * ECL entry attributes.</li> |
| | | * <li>Modified algorithm for choosing a RS to connect to: introduction of a |
| | | * ReplicationServerDSMsg message.</li> |
| | | * <li>also added of the server URL in RSInfo of TopologyMsg</li> |
| | | * <li>Introduction of a StopMsg for proper connections ending.</li> |
| | | * <li>Initialization failover/flow control</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V4 = 4; |
| | | |
| | | /** |
| | | * The constant for the 5th version of the replication protocol. |
| | | * - Add support for wild-cards in change log included attributes |
| | | * - Add support for specifying additional included attributes for deletes |
| | | * - See OPENDJ-194. |
| | | * <ul> |
| | | * <li>Add support for wild-cards in change log included attributes</li> |
| | | * <li>Add support for specifying additional included attributes for deletes</li> |
| | | * <li>See OPENDJ-194.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V5 = 5; |
| | | |
| | | /** |
| | | * The constant for the 6th version of the replication protocol. |
| | | * - include DS local URL in the DSInfo of TopologyMsg. |
| | | * <ul> |
| | | * <li>include DS local URL in the DSInfo of TopologyMsg.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V6 = 6; |
| | | |
| | | /** |
| | | * The constant for the 7th version of the replication protocol. |
| | | * - compact encoding for length, CSNs, and server IDs. |
| | | * <ul> |
| | | * <li>compact encoding for length, CSNs, and server IDs.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V7 = 7; |
| | | |
| | | /** |
| | | * The constant for the 8th version of the replication protocol. |
| | | * <ul> |
| | | * <li>StopMsg now has a timestamp to communicate the replica stop time.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V8 = 8; |
| | | |
| | | /** |
| | | * The replication protocol version used by the instance of RS/DS in this VM. |
| | | */ |
| | | private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V7; |
| | | private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V8; |
| | | |
| | | /** |
| | | * Gets the current version of the replication protocol. |
| | |
| | | * |
| | | * |
| | | * Copyright 2009 Sun Microsystems, Inc. |
| | | * Portions copyright 2013 ForgeRock AS. |
| | | * Portions copyright 2013-2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | in[0]); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public byte[] getBytes(short protocolVersion) |
| | | { |
| | |
| | | MSG_TYPE_STOP |
| | | }; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName(); |
| | | } |
| | | } |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | |
| | | private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>(); |
| | | private final Map<DN, List<Integer>> domainToServerIds = |
| | | new HashMap<DN, List<Integer>>(); |
| | | private final Map<DN, List<CSN>> offlineReplicas = |
| | | new HashMap<DN, List<CSN>>(); |
| | | |
| | | /** |
| | | * Sets the generationId for the supplied replication domain. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Adds the following replica information to the offline list. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN of the offline replica |
| | | * @param offlineCSN |
| | | * the CSN (serverId + timestamp) of the offline replica |
| | | */ |
| | | public void addOfflineReplica(DN baseDN, CSN offlineCSN) |
| | | { |
| | | List<CSN> offlineCSNs = offlineReplicas.get(baseDN); |
| | | if (offlineCSNs == null) |
| | | { |
| | | offlineCSNs = new LinkedList<CSN>(); |
| | | offlineReplicas.put(baseDN, offlineCSNs); |
| | | } |
| | | offlineCSNs.add(offlineCSN); |
| | | } |
| | | |
| | | /** |
| | | * Returns the Map of domainBaseDN => generationId. |
| | | * |
| | | * @return a Map of domainBaseDN => generationId |
| | |
| | | return domainToServerIds; |
| | | } |
| | | |
| | | /** |
| | | * Returns the Map of domainBaseDN => List<offlineCSN>. |
| | | * |
| | | * @return a Map of domainBaseDN => List<offlineCSN>. |
| | | */ |
| | | public Map<DN, List<CSN>> getOfflineReplicas() |
| | | { |
| | | return offlineReplicas; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "domainToGenerationId=" + domainToGenerationId |
| | | + ", domainToServerIds=" + domainToServerIds; |
| | | + ", domainToServerIds=" + domainToServerIds |
| | | + ", offlineReplicas=" + offlineReplicas; |
| | | } |
| | | } |
| | |
| | | * value received, and forwarding the message to the other RSes. |
| | | * @param senderHandler The handler for the server that sent the heartbeat. |
| | | * @param msg The message to process. |
| | | * @throws DirectoryException |
| | | * if a problem occurs |
| | | */ |
| | | void processChangeTimeHeartbeatMsg(ServerHandler senderHandler, |
| | | ChangeTimeHeartbeatMsg msg) |
| | | ChangeTimeHeartbeatMsg msg) throws DirectoryException |
| | | { |
| | | try |
| | | { |
| | | if (msg.isReplicaOfflineMsg()) |
| | | { |
| | | domainDB.replicaOffline(baseDN, msg.getCSN()); |
| | | } |
| | | else |
| | | { |
| | | domainDB.replicaHeartbeat(baseDN, msg.getCSN()); |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e |
| | | .getMessageObject(), e); |
| | | } |
| | | |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | /* |
| | |
| | | /** |
| | | * Processes a change time heartbeat msg. |
| | | * |
| | | * @param msg The message to be processed. |
| | | * @param msg |
| | | * The message to be processed. |
| | | * @throws DirectoryException |
| | | * When an exception is raised. |
| | | */ |
| | | void process(ChangeTimeHeartbeatMsg msg) |
| | | void process(ChangeTimeHeartbeatMsg msg) throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " |
| | | + replicationServerDomain.getLocalRSMonitorInstanceName() + " " |
| | | + this + " processes received msg:\n" + msg); |
| | | } |
| | | replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg); |
| | | } |
| | | |
| | |
| | | // lets update the LDAP server with out current window size and hope |
| | | // that everything will work better in the future. |
| | | // TODO also log an error message. |
| | | WindowMsg msg = new WindowMsg(rcvWindow); |
| | | session.publish(msg); |
| | | } else |
| | | session.publish(new WindowMsg(rcvWindow)); |
| | | } |
| | | else |
| | | { |
| | | // Both the LDAP server and the replication server believes that the |
| | | // window is closed. Lets check the flowcontrol in case we |
| | |
| | | * the replication domain baseDN |
| | | * @param offlineCSN |
| | | * The CSN (serverId and timestamp) for the replica's going offline |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | void replicaOffline(DN baseDN, CSN offlineCSN); |
| | | void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException; |
| | | } |
| | |
| | | |
| | | /** |
| | | * Holds the last time each replica was seen alive, whether via updates or |
| | | * heartbeats received. Data is held for each serverId cross domain. |
| | | * heartbeat notifications, or offline notifications. Data is held for each |
| | | * serverId cross domain. |
| | | * <p> |
| | | * Updates are persistent and stored in the replicaDBs, heartbeats are |
| | | * transient and are easily constructed on normal operations. |
| | |
| | | * @return true if the provided baseDN is enabled for the external changelog, |
| | | * false if the provided baseDN is disabled for the external changelog |
| | | * or unknown to multimaster replication. |
| | | * @see MultimasterReplication#isECLEnabledDomain(DN) |
| | | */ |
| | | protected boolean isECLEnabledDomain(DN baseDN) |
| | | { |
| | |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas() |
| | | .entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | final List<CSN> offlineCSNs = entry.getValue(); |
| | | for (CSN offlineCSN : offlineCSNs) |
| | | { |
| | | if (isECLEnabledDomain(baseDN)) |
| | | { |
| | | replicasOffline.update(baseDN, offlineCSN); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // this will not be used any more. Discard for garbage collection. |
| | | this.changelogState = null; |
| | | } |
| | |
| | | } |
| | | |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, |
| | | final DN mcBaseDN) |
| | | final DN mcBaseDN) throws ChangelogException |
| | | { |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(mcBaseDN, mcCSN); |
| | | mediumConsistency = Pair.of(mcBaseDN, mcCSN); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId()); |
| | | if (offlineCSN != null |
| | | && offlineCSN.isOlderThan(mcCSN) |
| | | // If no new updates has been seen for this replica |
| | | && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN)) |
| | | final int mcServerId = mcCSN.getServerId(); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); |
| | | final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); |
| | | if (offlineCSN != null) |
| | | { |
| | | removeCursor(mcBaseDN, mcCSN); |
| | | if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN)) |
| | | { |
| | | // replica is back online, we can forget the last time it was offline |
| | | replicasOffline.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | else if (offlineCSN.isOlderThan(mcCSN)) |
| | | { |
| | | /* |
| | | * replica is not back online and Medium consistency point has gone past |
| | | * its last offline time: remove everything known about it: cursor, |
| | | * offlineCSN from lastAliveCSN and remove all knowledge of this replica |
| | | * from the medium consistency RUV. |
| | | */ |
| | | removeCursor(mcBaseDN, mcCSN); |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void removeAllCursors() |
| | | { |
| | |
| | | } |
| | | |
| | | private void removeCursor(final DN baseDN, final CSN csn) |
| | | throws ChangelogException |
| | | { |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1 |
| | | : allCursors.entrySet()) |
| | |
| | | { |
| | | iter.remove(); |
| | | StaticUtils.close(entry2.getValue()); |
| | | resetNextChangeForInsertDBCursor(); |
| | | return; |
| | | } |
| | | } |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void replicaOffline(DN baseDN, CSN offlineCSN) |
| | | throws ChangelogException |
| | | { |
| | | dbEnv.addOfflineReplica(baseDN, offlineCSN); |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | indexer.replicaOffline(baseDN, offlineCSN); |
| | | } |
| | | // TODO save this state in the changelogStateDB? |
| | | } |
| | | |
| | | /** |
| | |
| | | // wait a bit before purging more |
| | | return DEFAULT_SLEEP; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initiateShutdown() |
| | | { |
| | | super.initiateShutdown(); |
| | | this.interrupt(); // wake up the purger thread for faster shutdown |
| | | } |
| | | } |
| | | } |
| | |
| | | synchronized (this) |
| | | { |
| | | closeCursor(); |
| | | // previously exhausted cursor must be able to reinitialize themselves |
| | | // Previously exhausted cursor must be able to reinitialize themselves. |
| | | // There is a risk of readLock never being unlocked |
| | | // if following code is called while the cursor is closed. |
| | | // It is better to let the deadlock happen to help quickly identifying |
| | | // and fixing such issue with unit tests. |
| | | cursor = db.openReadCursor(lastNonNullCurrentCSN); |
| | | currentChange = cursor.next(); |
| | | if (currentChange != null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Called by the Gc when the object is garbage collected Release the internal |
| | | * Called by the Gc when the object is garbage collected. Release the internal |
| | | * cursor in case the cursor was badly used and {@link #close()} was never |
| | | * called. |
| | | */ |
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | |
| | |
| | | private ReplicationServer replicationServer; |
| | | private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); |
| | | private static final String GENERATION_ID_TAG = "GENID"; |
| | | private static final String OFFLINE_TAG = "OFFLINE"; |
| | | private static final String FIELD_SEPARATOR = " "; |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | |
| | | } |
| | | result.setDomainGenerationId(baseDN, generationId); |
| | | } |
| | | else if (prefix.equals(OFFLINE_TAG)) |
| | | { |
| | | final String[] str = stringKey.split(FIELD_SEPARATOR, 3); |
| | | final int serverId = toInt(str[1]); |
| | | final DN baseDN = DN.decode(str[2]); |
| | | long timestamp = ByteString.wrap(entry.getValue()).asReader().getLong(); |
| | | if (debugEnabled()) |
| | | { |
| | | debug("has read replica offline: baseDN=" + baseDN + " serverId=" |
| | | + serverId); |
| | | } |
| | | result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId)); |
| | | } |
| | | else |
| | | { |
| | | final String[] str = stringData.split(FIELD_SEPARATOR, 2); |
| | |
| | | final DN baseDN = DN.decode(str[1]); |
| | | if (debugEnabled()) |
| | | { |
| | | debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId); |
| | | debug("has read replica: baseDN=" + baseDN + " serverId=" |
| | | + serverId); |
| | | } |
| | | result.addServerIdToDomain(serverId, baseDN); |
| | | } |
| | |
| | | // Opens the DB for the changes received from this server on this domain. |
| | | final Database replicaDB = openDatabase(replicaEntry.getKey()); |
| | | |
| | | putInChangelogStateDBIfNotExist(replicaEntry); |
| | | putInChangelogStateDBIfNotExist(toByteArray(replicaEntry)); |
| | | putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId)); |
| | | return replicaDB; |
| | | } |
| | |
| | | * the replica's serverId |
| | | * @return a database entry for the replica |
| | | */ |
| | | Entry<String, String> toReplicaEntry(DN baseDN, int serverId) |
| | | static Entry<String, String> toReplicaEntry(DN baseDN, int serverId) |
| | | { |
| | | final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString(); |
| | | return new SimpleImmutableEntry<String, String>(key, key); |
| | |
| | | * the domain's generationId |
| | | * @return a database entry for the generationId |
| | | */ |
| | | Entry<String, String> toGenIdEntry(DN baseDN, long generationId) |
| | | static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId) |
| | | { |
| | | final String normDn = baseDN.toNormalizedString(); |
| | | final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn; |
| | | final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId |
| | | + FIELD_SEPARATOR + normDn; |
| | | return new SimpleImmutableEntry<String, String>(key, data); |
| | | return new SimpleImmutableEntry<byte[], byte[]>(toBytes(key),toBytes(data)); |
| | | } |
| | | |
| | | private void putInChangelogStateDBIfNotExist(Entry<String, String> entry) |
| | | throws RuntimeException |
| | | /** |
| | | * Converts an Entry<String, String> to an Entry<byte[], byte[]>. |
| | | * |
| | | * @param entry |
| | | * the entry to convert |
| | | * @return the converted entry |
| | | */ |
| | | static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry) |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey())); |
| | | return new SimpleImmutableEntry<byte[], byte[]>( |
| | | toBytes(entry.getKey()), |
| | | toBytes(entry.getValue())); |
| | | } |
| | | |
| | | /** |
| | | * Return an entry to store in the changelog state database representing the |
| | | * time a replica went offline. |
| | | * |
| | | * @param baseDN |
| | | * the replica's baseDN |
| | | * @param offlineCSN |
| | | * the replica's serverId and offline timestamp |
| | | * @return a database entry representing the time a replica went offline |
| | | */ |
| | | static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN) |
| | | { |
| | | final byte[] key = |
| | | toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId() |
| | | + FIELD_SEPARATOR + baseDN.toNormalizedString()); |
| | | final ByteStringBuilder data = new ByteStringBuilder(8); // store a long |
| | | data.append(offlineCSN.getTime()); |
| | | return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray()); |
| | | } |
| | | |
| | | private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry) |
| | | throws ChangelogException, RuntimeException |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(entry.getKey()); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try |
| | | { |
| | | data.setData(toBytes(entry.getValue())); |
| | | data.setData(entry.getValue()); |
| | | if (debugEnabled()) |
| | | { |
| | | debug("putting record in the changelogstate Db key=[" |
| | | + entry.getKey() + "] value=[" + entry.getValue() + "]"); |
| | | + toString(entry.getKey()) + "] value=[" |
| | | + toString(entry.getValue()) + "]"); |
| | | } |
| | | changelogStateDb.put(txn, key, data); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | |
| | | */ |
| | | public void clearServerId(DN baseDN, int serverId) throws ChangelogException |
| | | { |
| | | deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId), |
| | | deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)), |
| | | "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); |
| | | } |
| | | |
| | | private void deleteFromChangelogStateDB(Entry<String, ?> entry, |
| | | private void deleteFromChangelogStateDB(Entry<byte[], ?> entry, |
| | | String methodInvocation) throws ChangelogException |
| | | { |
| | | if (debugEnabled()) |
| | |
| | | |
| | | try |
| | | { |
| | | final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey())); |
| | | final DatabaseEntry key = new DatabaseEntry(entry.getKey()); |
| | | final DatabaseEntry data = new DatabaseEntry(); |
| | | if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | |
| | | throw dbe; |
| | | } |
| | | } |
| | | else |
| | | else if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " failed: key not found"); |
| | | } |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " failed: key=[ " + entry.getKey() |
| | | + "] not found"); |
| | | debug(methodInvocation + " error: " + stackTraceToSingleLineString(e)); |
| | | } |
| | | throw new ChangelogException(e); |
| | | } |
| | | } |
| | | } |
| | | catch (RuntimeException dbe) |
| | | |
| | | /** |
| | | * Add the information about an offline replica to the changelog state DB. |
| | | * |
| | | * @param baseDN |
| | | * the domain of the offline replica |
| | | * @param offlineCSN |
| | | * the offline replica serverId and offline timestamp |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | public void addOfflineReplica(DN baseDN, CSN offlineCSN) |
| | | throws ChangelogException |
| | | { |
| | | throw new ChangelogException(dbe); |
| | | // just overwrite any older entry as it is assumed a newly received offline |
| | | // CSN is newer than the previous one |
| | | putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN), |
| | | "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); |
| | | } |
| | | |
| | | private void putInChangelogStateDB(Entry<byte[], byte[]> entry, |
| | | String methodInvocation) throws ChangelogException |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " starting"); |
| | | } |
| | | |
| | | try |
| | | { |
| | | final DatabaseEntry key = new DatabaseEntry(entry.getKey()); |
| | | final DatabaseEntry data = new DatabaseEntry(entry.getValue()); |
| | | changelogStateDb.put(null, key, data); |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " succeeded"); |
| | | } |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " error: " + stackTraceToSingleLineString(e)); |
| | | } |
| | | throw new ChangelogException(e); |
| | | } |
| | | } |
| | | |
| | |
| | | * |
| | | * |
| | | * Copyright 2009 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | |
| | | import org.opends.server.replication.protocol.Session; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | |
| | | * @param session The session on which heartbeats are to be sent. |
| | | * @param heartbeatInterval The interval between heartbeats sent |
| | | * (in milliseconds). |
| | | * @param serverId2 The serverId of the sender domain. |
| | | * @param serverId The serverId of the sender domain. |
| | | */ |
| | | public CTHeartbeatPublisherThread(String threadName, Session session, |
| | | long heartbeatInterval, int serverId2) |
| | | long heartbeatInterval, int serverId) |
| | | { |
| | | super(threadName); |
| | | this.session = session; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.serverId = serverId2; |
| | | this.serverId = serverId; |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public void run() |
| | | { |
| | | long lastHeartbeatTime = 0; |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | |
| | | |
| | | while (!shutdown) |
| | | { |
| | | long now = System.currentTimeMillis(); |
| | | final CSN csn = new CSN(TimeThread.getTime(), 0, serverId); |
| | | ChangeTimeHeartbeatMsg ctHeartbeatMsg = new ChangeTimeHeartbeatMsg(csn); |
| | | |
| | | final long now = System.currentTimeMillis(); |
| | | if (now > session.getLastPublishTime() + heartbeatInterval) |
| | | { |
| | | session.publish(ctHeartbeatMsg); |
| | | final CSN csn = new CSN(now, 0, serverId); |
| | | session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn)); |
| | | lastHeartbeatTime = csn.getTime(); |
| | | } |
| | | |
| | | long sleepTime = session.getLastPublishTime() + heartbeatInterval - now; |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (shutdown) |
| | | { |
| | | /* |
| | | * Shortcoming: this thread is restarted each time the DS reconnects, |
| | | * e.g. during load balancing. This is not that much of a problem |
| | | * because the ChangeNumberIndexer tolerates receiving replica offline |
| | | * heartbeats and then receiving messages back again. |
| | | */ |
| | | /* |
| | | * However, during shutdown we need to be sure that all pending client |
| | | * operations have either completed or have been aborted before shutting |
| | | * down replication. Otherwise, the medium consistency will move forward |
| | | * without knowing about these changes. |
| | | */ |
| | | final long now = System.currentTimeMillis(); |
| | | final int seqNum = lastHeartbeatTime == now ? 1 : 0; |
| | | final CSN offlineCSN = new CSN(now, seqNum, serverId); |
| | | session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN)); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | | import java.util.Collection; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.testng.Assert; |
| | | import org.testng.annotations.Test; |
| | | |
| | | /** |
| | | * Test for {@link ByteStringBuilder} and {@link ByteArrayScanner} classes. |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | public class ByteArrayTest extends DirectoryServerTestCase |
| | | { |
| | | |
| | | @Test |
| | | public void testBuilderAppendMethodsAndScannerNextMethods() throws Exception |
| | | { |
| | | final boolean bo = true; |
| | | final byte by = 80; |
| | | final short sh = 42; |
| | | final int i = sh + 1; |
| | | final long l = i + 1; |
| | | final String st = "Yay!"; |
| | | final Collection<String> col = Arrays.asList("foo", "bar", "baz"); |
| | | final CSN csn = new CSN(42424242, 13, 42); |
| | | |
| | | byte[] bytes = new ByteArrayBuilder() |
| | | .append(bo) |
| | | .append(by) |
| | | .append(sh) |
| | | .append(i) |
| | | .append(l) |
| | | .append(st) |
| | | .appendStrings(col) |
| | | .appendUTF8(i) |
| | | .appendUTF8(l) |
| | | .append(csn) |
| | | .appendUTF8(csn) |
| | | .toByteArray(); |
| | | |
| | | final ByteArrayScanner scanner = new ByteArrayScanner(bytes); |
| | | Assert.assertEquals(scanner.nextBoolean(), bo); |
| | | Assert.assertEquals(scanner.nextByte(), by); |
| | | Assert.assertEquals(scanner.nextShort(), sh); |
| | | Assert.assertEquals(scanner.nextInt(), i); |
| | | Assert.assertEquals(scanner.nextLong(), l); |
| | | Assert.assertEquals(scanner.nextString(), st); |
| | | Assert.assertEquals(scanner.nextStrings(new ArrayList<String>()), col); |
| | | Assert.assertEquals(scanner.nextIntUTF8(), i); |
| | | Assert.assertEquals(scanner.nextLongUTF8(), l); |
| | | Assert.assertEquals(scanner.nextCSN(), csn); |
| | | Assert.assertEquals(scanner.nextCSNUTF8(), csn); |
| | | Assert.assertTrue(scanner.isEmpty()); |
| | | } |
| | | } |
| | |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.replication.protocol.OperationContext.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | /** |
| | |
| | | List<Modification> mods4 = new ArrayList<Modification>(); |
| | | for (int i = 0; i < 10; i++) |
| | | { |
| | | Attribute attr = Attributes.create("description", "string" |
| | | + String.valueOf(i)); |
| | | Modification mod = new Modification(ModificationType.ADD, attr); |
| | | mods4.add(mod); |
| | | Attribute attr = Attributes.create("description", "string" + i); |
| | | mods4.add(new Modification(ModificationType.ADD, attr)); |
| | | } |
| | | |
| | | Attribute attr5 = Attributes.create("namingcontexts", TEST_ROOT_DN_STRING); |
| | |
| | | msg.setAssuredMode(assuredMode); |
| | | msg.setSafeDataLevel(safeDataLevel); |
| | | |
| | | // Set ECL entry inlcuded attributes |
| | | // Set ECL entry included attributes |
| | | if (entryAttrList != null) |
| | | { |
| | | msg.setEclIncludes(entryAttrList); |
| | | } |
| | | |
| | | ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg( |
| | | msg.getBytes(), ProtocolVersion.getCurrentVersion()); |
| | | msg.getBytes(), getCurrentVersion()); |
| | | |
| | | // Test that generated attributes match original attributes. |
| | | assertEquals(generatedMsg.isAssured(), isAssured); |
| | |
| | | |
| | | // Check equals |
| | | ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg( |
| | | msg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1); |
| | | msg.getBytes(), REPLICATION_PROTOCOL_V1); |
| | | assertFalse(msg.equals(null)); |
| | | assertFalse(msg.equals(new Object())); |
| | | |
| | |
| | | } |
| | | msg.setInitiatorsName("johnny h"); |
| | | DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg.generateMsg( |
| | | msg.getBytes(), ProtocolVersion.getCurrentVersion()); |
| | | msg.getBytes(), getCurrentVersion()); |
| | | |
| | | assertEquals(msg.toString(), generatedMsg.toString()); |
| | | assertEquals(msg.getInitiatorsName(), generatedMsg.getInitiatorsName()); |
| | |
| | | msg.setEclIncludes(entryAttrList); |
| | | } |
| | | |
| | | ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg |
| | | .generateMsg(msg.getBytes(), ProtocolVersion.getCurrentVersion()); |
| | | ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg.generateMsg( |
| | | msg.getBytes(), getCurrentVersion()); |
| | | |
| | | // Test that generated attributes match original attributes. |
| | | assertEquals(generatedMsg.isAssured(), isAssured); |
| | |
| | | msg.setEclIncludes(entryAttrList); |
| | | } |
| | | |
| | | AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(msg |
| | | .getBytes(), ProtocolVersion.getCurrentVersion()); |
| | | AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg( |
| | | msg.getBytes(), getCurrentVersion()); |
| | | assertEquals(generatedMsg.getBytes(), msg.getBytes()); |
| | | assertEquals(generatedMsg.toString(), msg.toString()); |
| | | |
| | |
| | | new StopMsg(msg.getBytes(getCurrentVersion())); |
| | | } |
| | | |
| | | @Test |
| | | public void changeTimeHeartbeatMsgTest() throws Exception |
| | | { |
| | | final short v1 = REPLICATION_PROTOCOL_V1; |
| | | final short v7 = REPLICATION_PROTOCOL_V7; |
| | | final short v8 = REPLICATION_PROTOCOL_V8; |
| | | |
| | | final CSN csn = new CSN(System.currentTimeMillis(), 0, 42); |
| | | final ChangeTimeHeartbeatMsg heartbeatMsg = ChangeTimeHeartbeatMsg.heartbeatMsg(csn); |
| | | assertCTHearbeatMsg(heartbeatMsg, v1, false); |
| | | assertCTHearbeatMsg(heartbeatMsg, v7, false); |
| | | assertCTHearbeatMsg(heartbeatMsg, v8, false); |
| | | |
| | | final ChangeTimeHeartbeatMsg offlineMsg = ChangeTimeHeartbeatMsg.replicaOfflineMsg(csn); |
| | | assertCTHearbeatMsg(offlineMsg, v1, false); |
| | | assertCTHearbeatMsg(offlineMsg, v7, false); |
| | | assertCTHearbeatMsg(offlineMsg, v8, true); |
| | | } |
| | | |
| | | private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg heartbeatMsg, |
| | | short version, boolean expected) throws DataFormatException |
| | | { |
| | | final byte[] bytes = heartbeatMsg.getBytes(version); |
| | | ChangeTimeHeartbeatMsg decodedMsg = new ChangeTimeHeartbeatMsg(bytes, version); |
| | | assertEquals(decodedMsg.isReplicaOfflineMsg(), expected); |
| | | } |
| | | |
| | | /** |
| | | * Test that WindowMsg encoding and decoding works |
| | | * by checking that : msg == new WindowMsg(msg.getBytes()). |
| | |
| | | throws Exception |
| | | { |
| | | TopologyMsg msg = new TopologyMsg(dsList, rsList); |
| | | TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()), |
| | | ProtocolVersion.getCurrentVersion()); |
| | | TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()), getCurrentVersion()); |
| | | assertEquals(msg.getReplicaInfos(), newMsg.getReplicaInfos()); |
| | | assertEquals(msg.getRsInfos(), newMsg.getRsInfos()); |
| | | } |
| | |
| | | msg.setServerState(sid3, s3, now+3, false); |
| | | |
| | | byte[] b = msg.getBytes(getCurrentVersion()); |
| | | MonitorMsg newMsg = new MonitorMsg(b, ProtocolVersion.getCurrentVersion()); |
| | | MonitorMsg newMsg = new MonitorMsg(b, getCurrentVersion()); |
| | | |
| | | assertEquals(rsState, msg.getReplServerDbState()); |
| | | assertEquals(newMsg.getReplServerDbState().toString(), |
| | | msg.getReplServerDbState().toString()); |
| | | |
| | | Iterator<Integer> it = newMsg.ldapIterator(); |
| | | while (it.hasNext()) |
| | | for (int sid : toIterable(newMsg.ldapIterator())) |
| | | { |
| | | int sid = it.next(); |
| | | ServerState s = newMsg.getLDAPServerState(sid); |
| | | if (sid == sid1) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | Iterator<Integer> it2 = newMsg.rsIterator(); |
| | | while (it2.hasNext()) |
| | | for (int sid : toIterable(newMsg.rsIterator())) |
| | | { |
| | | int sid = it2.next(); |
| | | ServerState s = newMsg.getRSServerState(sid); |
| | | if (sid == sid3) |
| | | { |
| | |
| | | encodemsg += (t4 - t31); |
| | | |
| | | // getBytes |
| | | byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion()); |
| | | byte[] bytes = generatedMsg.getBytes(getCurrentVersion()); |
| | | t5 = System.nanoTime(); |
| | | getbytes += (t5 - t4); |
| | | |
| | |
| | | encodemsg += (t4 - t31); |
| | | |
| | | // getBytes |
| | | byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion()); |
| | | byte[] bytes = generatedMsg.getBytes(getCurrentVersion()); |
| | | t5 = System.nanoTime(); |
| | | getbytes += (t5 - t4); |
| | | |
| | |
| | | encodemsg += (t4 - t31); |
| | | |
| | | // getBytes |
| | | byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion()); |
| | | byte[] bytes = generatedMsg.getBytes(getCurrentVersion()); |
| | | t5 = System.nanoTime(); |
| | | getbytes += (t5 - t4); |
| | | |
| | |
| | | |
| | | final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4); |
| | | publishUpdateMsg(msg4); |
| | | // MCP moved forward after receiving update from serverId1 |
| | | // MCP moves forward after receiving update from serverId1 |
| | | // (last replica in the domain) |
| | | assertExternalChangelogContent(msg1, msg2, msg4); |
| | | |
| | | // serverId2 comes online again |
| | | final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5); |
| | | publishUpdateMsg(msg5); |
| | | // MCP does not move until it knows what happens to serverId1 |
| | | assertExternalChangelogContent(msg1, msg2, msg4); |
| | | sendHeartbeat(BASE_DN1, serverId1, 6); |
| | | // MCP moves forward |
| | | assertExternalChangelogContent(msg1, msg2, msg4, msg5); |
| | | } |
| | | |
| | | private void addReplica(DN baseDN, int serverId) throws Exception |
| | |
| | | waitForWaitingState(cnIndexer); |
| | | } |
| | | |
| | | private void stopCNIndexer() |
| | | private void stopCNIndexer() throws Exception |
| | | { |
| | | if (cnIndexer != null) |
| | | { |
| | | cnIndexer.initiateShutdown(); |
| | | cnIndexer.interrupt(); |
| | | cnIndexer.join(); |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.DN; |
| | |
| | | import com.sleepycat.je.Environment; |
| | | |
| | | import static java.util.Arrays.*; |
| | | import static java.util.Collections.*; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.je.ReplicationDbEnv.*; |
| | |
| | | @DataProvider |
| | | public Object[][] changelogStateDataProvider() throws Exception |
| | | { |
| | | final int genId = 524157415; |
| | | final int id1 = 42; |
| | | final int id2 = 346; |
| | | final int t1 = 1956245524; |
| | | return new Object[][] { |
| | | { DN.decode("dc=example,dc=com"), 524157415, asList(42, 346) }, |
| | | // test with a space in the baseDN (space is the field separator in the DB) |
| | | { DN.decode("cn=admin data"), 524157415, asList(42, 346) }, |
| | | }; |
| | | { DN.decode("dc=example,dc=com"), genId, EMPTY_LIST, EMPTY_LIST }, |
| | | { DN.decode("dc=example,dc=com"), genId, asList(id1, id2), |
| | | asList(new CSN(id2, 0, t1)) }, |
| | | // test with a space in the baseDN (space is the field separator in the |
| | | // DB) |
| | | { DN.decode("cn=admin data"), genId, asList(id1, id2), EMPTY_LIST }, }; |
| | | } |
| | | |
| | | @Test(dataProvider = "changelogStateDataProvider") |
| | | public void encodeDecodeChangelogState(DN baseDN, long generationId, |
| | | List<Integer> serverIds) throws Exception |
| | | List<Integer> replicas, List<CSN> offlineReplicas) throws Exception |
| | | { |
| | | final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv(); |
| | | |
| | | // encode data |
| | | final Map<byte[], byte[]> wholeState = new LinkedHashMap<byte[], byte[]>(); |
| | | put(wholeState, changelogStateDB.toGenIdEntry(baseDN, generationId)); |
| | | for (Integer serverId : serverIds) |
| | | put(wholeState, toGenIdEntry(baseDN, generationId)); |
| | | for (Integer serverId : replicas) |
| | | { |
| | | put(wholeState, changelogStateDB.toReplicaEntry(baseDN, serverId)); |
| | | put(wholeState, toByteArray(toReplicaEntry(baseDN, serverId))); |
| | | } |
| | | for (CSN offlineCSN : offlineReplicas) |
| | | { |
| | | put(wholeState, toReplicaOfflineEntry(baseDN, offlineCSN)); |
| | | } |
| | | |
| | | // decode data |
| | |
| | | changelogStateDB.decodeChangelogState(wholeState); |
| | | assertThat(state.getDomainToGenerationId()).containsExactly( |
| | | entry(baseDN, generationId)); |
| | | if (!replicas.isEmpty()) |
| | | { |
| | | assertThat(state.getDomainToServerIds()).containsExactly( |
| | | entry(baseDN, serverIds)); |
| | | entry(baseDN, replicas)); |
| | | } |
| | | else |
| | | { |
| | | assertThat(state.getDomainToServerIds()).isEmpty(); |
| | | } |
| | | if (!offlineReplicas.isEmpty()) |
| | | { |
| | | assertThat(state.getOfflineReplicas()).containsExactly( |
| | | entry(baseDN, offlineReplicas)); |
| | | } |
| | | else |
| | | { |
| | | assertThat(state.getOfflineReplicas()).isEmpty(); |
| | | } |
| | | } |
| | | |
| | | private void put(Map<byte[], byte[]> map, Entry<String, String> entry) |
| | | private void put(Map<byte[], byte[]> map, Entry<byte[], byte[]> entry) |
| | | { |
| | | map.put(toBytes(entry.getKey()), toBytes(entry.getValue())); |
| | | map.put(entry.getKey(), entry.getValue()); |
| | | } |
| | | |
| | | } |