| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.Collection; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | |
| | | /** |
| | | * Byte array builder class encodes data into byte arrays to send messages over |
| | | * the replication protocol. Built on top of {@link ByteStringBuilder}, it |
| | | * isolates the latter against legacy type conversions from the replication |
| | | * protocol. It exposes a fluent API. |
| | | * |
| | | * @see ByteArrayScanner ByteArrayScanner class that decodes messages built with |
| | | * current class. |
| | | */ |
| | | public class ByteArrayBuilder |
| | | { |
| | | |
| | | /** This is the null byte, also known as zero byte. */ |
| | | public static final byte NULL_BYTE = 0; |
| | | private final ByteStringBuilder builder; |
| | | |
| | | /** |
| | | * Constructs a ByteArrayBuilder. |
| | | */ |
| | | public ByteArrayBuilder() |
| | | { |
| | | builder = new ByteStringBuilder(); |
| | | } |
| | | |
| | | /** |
| | | * Constructs a ByteArrayBuilder. |
| | | * |
| | | * @param capacity |
| | | * the capacity of the underlying ByteStringBuilder |
| | | */ |
| | | public ByteArrayBuilder(int capacity) |
| | | { |
| | | builder = new ByteStringBuilder(capacity); |
| | | } |
| | | |
| | | /** |
| | | * Append a boolean to this ByteArrayBuilder. |
| | | * |
| | | * @param b |
| | | * the boolean to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(boolean b) |
| | | { |
| | | append((byte) (b ? 1 : 0)); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a byte to this ByteArrayBuilder. |
| | | * |
| | | * @param b |
| | | * the byte to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(byte b) |
| | | { |
| | | builder.append(b); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a short to this ByteArrayBuilder. |
| | | * |
| | | * @param s |
| | | * the short to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(short s) |
| | | { |
| | | builder.append(s); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append an int to this ByteArrayBuilder. |
| | | * |
| | | * @param i |
| | | * the long to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(int i) |
| | | { |
| | | builder.append(i); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a long to this ByteArrayBuilder. |
| | | * |
| | | * @param l |
| | | * the long to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(long l) |
| | | { |
| | | builder.append(l); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append an int to this ByteArrayBuilder by converting it to a String then |
| | | * encoding that string to a UTF-8 byte array. |
| | | * |
| | | * @param i |
| | | * the int to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder appendUTF8(int i) |
| | | { |
| | | return append(Integer.toString(i)); |
| | | } |
| | | |
| | | /** |
| | | * Append a long to this ByteArrayBuilder by converting it to a String then |
| | | * encoding that string to a UTF-8 byte array. |
| | | * |
| | | * @param l |
| | | * the long to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder appendUTF8(long l) |
| | | { |
| | | return append(Long.toString(l)); |
| | | } |
| | | |
| | | /** |
| | | * Append a Collection of Strings to this ByteArrayBuilder. |
| | | * |
| | | * @param col |
| | | * the Collection of Strings to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder appendStrings(Collection<String> col) |
| | | { |
| | | append(col.size()); |
| | | for (String s : col) |
| | | { |
| | | append(s); |
| | | } |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a String to this ByteArrayBuilder. |
| | | * |
| | | * @param s |
| | | * the String to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(String s) |
| | | { |
| | | try |
| | | { |
| | | append(s.getBytes("UTF-8")); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | throw new RuntimeException("Should never happen", e); |
| | | } |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a CSN to this ByteArrayBuilder. |
| | | * |
| | | * @param csn |
| | | * the CSN to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder append(CSN csn) |
| | | { |
| | | csn.toByteString(builder); |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Append a CSN to this ByteArrayBuilder by converting it to a String then |
| | | * encoding that string to a UTF-8 byte array. |
| | | * |
| | | * @param csn |
| | | * the CSN to append. |
| | | * @return this ByteArrayBuilder |
| | | */ |
| | | public ByteArrayBuilder appendUTF8(CSN csn) |
| | | { |
| | | append(csn.toString()); |
| | | return this; |
| | | } |
| | | |
| | | private ByteArrayBuilder append(byte[] sBytes) |
| | | { |
| | | for (byte b : sBytes) |
| | | { |
| | | append(b); |
| | | } |
| | | append((byte) 0); // zero separator |
| | | return this; |
| | | } |
| | | |
| | | /** |
| | | * Converts the content of this ByteStringBuilder to a byte array. |
| | | * |
| | | * @return the content of this ByteStringBuilder converted to a byte array. |
| | | */ |
| | | public byte[] toByteArray() |
| | | { |
| | | return builder.toByteArray(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return builder.toString(); |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * boolean fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of boolean fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended boolean fields. |
| | | */ |
| | | public static int booleans(int nbFields) |
| | | { |
| | | return nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * byte fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of byte fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended byte fields. |
| | | */ |
| | | public static int bytes(int nbFields) |
| | | { |
| | | return nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * short fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of short fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended short fields. |
| | | */ |
| | | public static int shorts(int nbFields) |
| | | { |
| | | return 2 * nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * int fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of int fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended int fields. |
| | | */ |
| | | public static int ints(int nbFields) |
| | | { |
| | | return 4 * nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * long fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of long fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended long fields. |
| | | */ |
| | | public static int longs(int nbFields) |
| | | { |
| | | return 8 * nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * CSN fields when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of CSN fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended CSN fields. |
| | | */ |
| | | public static int csns(int nbFields) |
| | | { |
| | | return CSN.BYTE_ENCODING_LENGTH * nbFields; |
| | | } |
| | | |
| | | /** |
| | | * Helper method that returns the number of bytes that would be used by the |
| | | * CSN fields encoded as a UTF8 string when appended to a ByteArrayBuilder. |
| | | * |
| | | * @param nbFields |
| | | * the number of CSN fields that will be appended to a |
| | | * ByteArrayBuilder |
| | | * @return the number of bytes occupied by the appended legacy-encoded CSN |
| | | * fields. |
| | | */ |
| | | public static int csnsUTF8(int nbFields) |
| | | { |
| | | return CSN.STRING_ENCODING_LENGTH * nbFields + 1 /* null byte */; |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.util.Collection; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.types.ByteSequenceReader; |
| | | import org.opends.server.types.ByteString; |
| | | |
| | | /** |
| | | * Byte array scanner class helps decode data from byte arrays received via |
| | | * messages over the replication protocol. Built on top of |
| | | * {@link ByteSequenceReader}, it isolates the latter against legacy type |
| | | * conversions from the replication protocol. |
| | | * |
| | | * @see ByteArrayBuilder ByteArrayBuilder class that encodes messages read with |
| | | * current class. |
| | | */ |
| | | public class ByteArrayScanner |
| | | { |
| | | |
| | | private final ByteSequenceReader bytes; |
| | | |
| | | /** |
| | | * Builds a ByteArrayScanner object that will read from the supplied byte |
| | | * array. |
| | | * |
| | | * @param bytes |
| | | * the byte array input that will be read from |
| | | */ |
| | | public ByteArrayScanner(byte[] bytes) |
| | | { |
| | | this.bytes = ByteString.wrap(bytes).asReader(); |
| | | } |
| | | |
| | | /** |
| | | * Reads the next boolean. |
| | | * |
| | | * @return the next boolean |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public boolean nextBoolean() throws DataFormatException |
| | | { |
| | | return nextByte() != 0; |
| | | } |
| | | |
| | | /** |
| | | * Reads the next byte. |
| | | * |
| | | * @return the next byte |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public byte nextByte() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return bytes.get(); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the next short. |
| | | * |
| | | * @return the next short |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public short nextShort() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return bytes.getShort(); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the next int. |
| | | * |
| | | * @return the next int |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public int nextInt() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return bytes.getInt(); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the next long. |
| | | * |
| | | * @return the next long |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public long nextLong() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return bytes.getLong(); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the next int that was encoded as a UTF8 string. |
| | | * |
| | | * @return the next int that was encoded as a UTF8 string. |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public int nextIntUTF8() throws DataFormatException |
| | | { |
| | | return Integer.valueOf(nextString()); |
| | | } |
| | | |
| | | /** |
| | | * Reads the next long that was encoded as a UTF8 string. |
| | | * |
| | | * @return the next long that was encoded as a UTF8 string. |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public long nextLongUTF8() throws DataFormatException |
| | | { |
| | | return Long.valueOf(nextString()); |
| | | } |
| | | |
| | | /** |
| | | * Reads the next UTF8-encoded string. |
| | | * |
| | | * @return the next UTF8-encoded string. |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public String nextString() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | final String s = bytes.getString(findZeroSeparator()); |
| | | bytes.skip(1); // skip the zero separator |
| | | return s; |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | private int findZeroSeparator() throws DataFormatException |
| | | { |
| | | int offset = 0; |
| | | final int remaining = bytes.remaining(); |
| | | while (bytes.peek(offset) != 0 && offset < remaining) |
| | | { |
| | | offset++; |
| | | } |
| | | if (offset == remaining) |
| | | { |
| | | throw new DataFormatException("No more data to read from"); |
| | | } |
| | | return offset; |
| | | } |
| | | |
| | | /** |
| | | * Reads the next UTF8-encoded strings in the provided collection. |
| | | * |
| | | * @param output |
| | | * the collection where to add the next UTF8-encoded strings |
| | | * @param <TCol> |
| | | * the collection's concrete type |
| | | * @return the provided collection where the next UTF8-encoded strings have |
| | | * been added. |
| | | * @throws DataFormatException |
| | | * if no more data can be read from the input |
| | | */ |
| | | public <TCol extends Collection<String>> TCol nextStrings(TCol output) |
| | | throws DataFormatException |
| | | { |
| | | final int colSize = nextInt(); |
| | | for (int i = 0; i < colSize; i++) |
| | | { |
| | | output.add(nextString()); |
| | | } |
| | | return output; |
| | | } |
| | | |
| | | /** |
| | | * Reads the next CSN. |
| | | * |
| | | * @return the next CSN. |
| | | * @throws DataFormatException |
| | | * if CSN was incorrectly encoded or no more data can be read from |
| | | * the input |
| | | */ |
| | | public CSN nextCSN() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return CSN.valueOf(bytes.getByteSequence(CSN.BYTE_ENCODING_LENGTH)); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Reads the next CSN that was encoded as a UTF8 string. |
| | | * |
| | | * @return the next CSN that was encoded as a UTF8 string. |
| | | * @throws DataFormatException |
| | | * if legacy CSN was incorrectly encoded or no more data can be read |
| | | * from the input |
| | | */ |
| | | public CSN nextCSNUTF8() throws DataFormatException |
| | | { |
| | | try |
| | | { |
| | | return CSN.valueOf(nextString()); |
| | | } |
| | | catch (IndexOutOfBoundsException e) |
| | | { |
| | | throw new DataFormatException(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns whether the scanner has more bytes to consume. |
| | | * |
| | | * @return true if the scanner has more bytes to consume, false otherwise. |
| | | */ |
| | | public boolean isEmpty() |
| | | { |
| | | return bytes.remaining() == 0; |
| | | } |
| | | |
| | | } |
| | |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.types.ByteSequenceReader; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | |
| | | import static org.opends.server.replication.protocol.ByteArrayBuilder.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | |
| | | /** |
| | | * Class that define messages sent by a replication domain (DS) to the |
| | |
| | | */ |
| | | public class ChangeTimeHeartbeatMsg extends ReplicationMsg |
| | | { |
| | | private static final byte NORMAL_HEARTBEAT = 0; |
| | | private static final byte REPLICA_OFFLINE_HEARTBEAT = 1; |
| | | |
| | | /** |
| | | * The CSN containing the change time. |
| | | */ |
| | | private final CSN csn; |
| | | /** |
| | | * The CSN containing the change time. |
| | | */ |
| | | private final byte eventType; |
| | | |
| | | private ChangeTimeHeartbeatMsg(CSN csn, byte eventType) |
| | | { |
| | | this.csn = csn; |
| | | this.eventType = eventType; |
| | | } |
| | | |
| | | /** |
| | | * Constructor of a Change Time Heartbeat message providing the change time |
| | | * value in a CSN. |
| | | * Factory method that builds a change time heartbeat message providing the |
| | | * change time value in a CSN. |
| | | * |
| | | * @param csn |
| | | * The provided CSN. |
| | | * @return a new ChangeTimeHeartbeatMsg |
| | | */ |
| | | public ChangeTimeHeartbeatMsg(CSN csn) |
| | | public static ChangeTimeHeartbeatMsg heartbeatMsg(CSN csn) |
| | | { |
| | | this.csn = csn; |
| | | return new ChangeTimeHeartbeatMsg(csn, NORMAL_HEARTBEAT); |
| | | } |
| | | |
| | | /** |
| | | * Factory method that builds a change time heartbeat message for a replica |
| | | * going offline. |
| | | * |
| | | * @param offlineCSN |
| | | * the serverId and timestamp of the replica going offline |
| | | * @return a new ChangeTimeHeartbeatMsg |
| | | */ |
| | | public static ChangeTimeHeartbeatMsg replicaOfflineMsg(CSN offlineCSN) |
| | | { |
| | | return new ChangeTimeHeartbeatMsg(offlineCSN, REPLICA_OFFLINE_HEARTBEAT); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns whether this is a replica offline message. |
| | | * |
| | | * @return true if this is a replica offline message, false if this is a |
| | | * regular heartbeat message. |
| | | */ |
| | | public boolean isReplicaOfflineMsg() |
| | | { |
| | | return eventType == REPLICA_OFFLINE_HEARTBEAT; |
| | | } |
| | | |
| | | /** |
| | | * Creates a message from a provided byte array. |
| | | * |
| | | * @param in |
| | |
| | | public ChangeTimeHeartbeatMsg(byte[] in, short version) |
| | | throws DataFormatException |
| | | { |
| | | final ByteSequenceReader reader = ByteString.wrap(in).asReader(); |
| | | try |
| | | { |
| | | if (reader.get() != MSG_TYPE_CT_HEARTBEAT) |
| | | final ByteArrayScanner scanner = new ByteArrayScanner(in); |
| | | final byte msgType = scanner.nextByte(); |
| | | if (msgType != MSG_TYPE_CT_HEARTBEAT) |
| | | { |
| | | // Throw better exception below. |
| | | throw new IllegalArgumentException(); |
| | | throw new DataFormatException("input is not a valid " |
| | | + getClass().getSimpleName() + " message: " + msgType); |
| | | } |
| | | |
| | | if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7) |
| | | { |
| | | csn = CSN.valueOf(reader.getByteSequence(CSN.BYTE_ENCODING_LENGTH)); |
| | | } |
| | | else |
| | | { |
| | | csn = CSN.valueOf(reader.getString(CSN.STRING_ENCODING_LENGTH)); |
| | | reader.get(); // Read trailing 0 byte. |
| | | } |
| | | csn = version >= REPLICATION_PROTOCOL_V7 |
| | | ? scanner.nextCSN() |
| | | : scanner.nextCSNUTF8(); |
| | | eventType = version >= REPLICATION_PROTOCOL_V8 |
| | | ? scanner.nextByte() |
| | | : NORMAL_HEARTBEAT; |
| | | |
| | | if (reader.remaining() > 0) |
| | | if (!scanner.isEmpty()) |
| | | { |
| | | // Throw better exception below. |
| | | throw new IllegalArgumentException(); |
| | | throw new DataFormatException( |
| | | "Did not expect to find more bytes to read for " |
| | | + getClass().getSimpleName() + " message."); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | catch (RuntimeException e) |
| | | { |
| | | // Index out of bounds, bad format, etc. |
| | | throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg"); |
| | |
| | | @Override |
| | | public byte[] getBytes(short protocolVersion) |
| | | { |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7) |
| | | if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V7) |
| | | { |
| | | final ByteStringBuilder builder = new ByteStringBuilder( |
| | | CSN.BYTE_ENCODING_LENGTH + 1 /* type + csn */); |
| | | ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csnsUTF8(1)); |
| | | builder.append(MSG_TYPE_CT_HEARTBEAT); |
| | | csn.toByteString(builder); |
| | | builder.appendUTF8(csn); |
| | | return builder.toByteArray(); |
| | | } |
| | | else |
| | | |
| | | final ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csns(1)); |
| | | builder.append(MSG_TYPE_CT_HEARTBEAT); |
| | | builder.append(csn); |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V8) |
| | | { |
| | | final ByteStringBuilder builder = new ByteStringBuilder( |
| | | CSN.STRING_ENCODING_LENGTH + 2 /* type + csn str + nul */); |
| | | builder.append(MSG_TYPE_CT_HEARTBEAT); |
| | | builder.append(csn.toString()); |
| | | builder.append((byte) 0); // For compatibility with earlier protocol |
| | | // versions. |
| | | return builder.toByteArray(); |
| | | builder.append(eventType); |
| | | } |
| | | return builder.toByteArray(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | { |
| | | return getClass().getSimpleName() + ", csn=" + csn.toStringUI(); |
| | | } |
| | | |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | |
| | | /** |
| | | * The version utility class for the replication protocol. |
| | | */ |
| | |
| | | public static final short REPLICATION_PROTOCOL_V1_REAL = 49; |
| | | /** |
| | | * The constant for the second version of the replication protocol. |
| | | * Add fields in the header for assured replication. |
| | | * <ul> |
| | | * <li>Add fields in the header for assured replication.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V2 = 2; |
| | | |
| | | /** |
| | | * The constant for the 3rd version of the replication protocol. |
| | | * Add messages for remote ECL : not used as of today. |
| | | * <ul> |
| | | * <li>Add messages for remote ECL : not used as of today.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V3 = 3; |
| | | |
| | | /** |
| | | * The constant for the 4th version of the replication protocol. |
| | | * - Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for |
| | | * ECL entry attributes. |
| | | * - Modified algorithm for choosing a RS to connect to: introduction of a |
| | | * ReplicationServerDSMsg message. |
| | | * -> also added of the server URL in RSInfo of TopologyMsg |
| | | * - Introduction of a StopMsg for proper connections ending. |
| | | * - Initialization failover/flow control |
| | | * <ul> |
| | | * <li>Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for |
| | | * ECL entry attributes.</li> |
| | | * <li>Modified algorithm for choosing a RS to connect to: introduction of a |
| | | * ReplicationServerDSMsg message.</li> |
| | | * <li>also added of the server URL in RSInfo of TopologyMsg</li> |
| | | * <li>Introduction of a StopMsg for proper connections ending.</li> |
| | | * <li>Initialization failover/flow control</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V4 = 4; |
| | | |
| | | /** |
| | | * The constant for the 5th version of the replication protocol. |
| | | * - Add support for wild-cards in change log included attributes |
| | | * - Add support for specifying additional included attributes for deletes |
| | | * - See OPENDJ-194. |
| | | * <ul> |
| | | * <li>Add support for wild-cards in change log included attributes</li> |
| | | * <li>Add support for specifying additional included attributes for deletes</li> |
| | | * <li>See OPENDJ-194.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V5 = 5; |
| | | |
| | | /** |
| | | * The constant for the 6th version of the replication protocol. |
| | | * - include DS local URL in the DSInfo of TopologyMsg. |
| | | * <ul> |
| | | * <li>include DS local URL in the DSInfo of TopologyMsg.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V6 = 6; |
| | | |
| | | /** |
| | | * The constant for the 7th version of the replication protocol. |
| | | * - compact encoding for length, CSNs, and server IDs. |
| | | * <ul> |
| | | * <li>compact encoding for length, CSNs, and server IDs.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V7 = 7; |
| | | |
| | | /** |
| | | * The constant for the 8th version of the replication protocol. |
| | | * <ul> |
| | | * <li>StopMsg now has a timestamp to communicate the replica stop time.</li> |
| | | * </ul> |
| | | */ |
| | | public static final short REPLICATION_PROTOCOL_V8 = 8; |
| | | |
| | | /** |
| | | * The replication protocol version used by the instance of RS/DS in this VM. |
| | | */ |
| | | private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V7; |
| | | private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V8; |
| | | |
| | | /** |
| | | * Gets the current version of the replication protocol. |
| | |
| | | * |
| | | * |
| | | * Copyright 2009 Sun Microsystems, Inc. |
| | | * Portions copyright 2013 ForgeRock AS. |
| | | * Portions copyright 2013-2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | |
| | | in[0]); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public byte[] getBytes(short protocolVersion) |
| | | { |
| | |
| | | MSG_TYPE_STOP |
| | | }; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName(); |
| | | } |
| | | } |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.types.DN; |
| | | |
| | | /** |
| | |
| | | private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>(); |
| | | private final Map<DN, List<Integer>> domainToServerIds = |
| | | new HashMap<DN, List<Integer>>(); |
| | | private final Map<DN, List<CSN>> offlineReplicas = |
| | | new HashMap<DN, List<CSN>>(); |
| | | |
| | | /** |
| | | * Sets the generationId for the supplied replication domain. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Adds the following replica information to the offline list. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN of the offline replica |
| | | * @param offlineCSN |
| | | * the CSN (serverId + timestamp) of the offline replica |
| | | */ |
| | | public void addOfflineReplica(DN baseDN, CSN offlineCSN) |
| | | { |
| | | List<CSN> offlineCSNs = offlineReplicas.get(baseDN); |
| | | if (offlineCSNs == null) |
| | | { |
| | | offlineCSNs = new LinkedList<CSN>(); |
| | | offlineReplicas.put(baseDN, offlineCSNs); |
| | | } |
| | | offlineCSNs.add(offlineCSN); |
| | | } |
| | | |
| | | /** |
| | | * Returns the Map of domainBaseDN => generationId. |
| | | * |
| | | * @return a Map of domainBaseDN => generationId |
| | |
| | | return domainToServerIds; |
| | | } |
| | | |
| | | /** |
| | | * Returns the Map of domainBaseDN => List<offlineCSN>. |
| | | * |
| | | * @return a Map of domainBaseDN => List<offlineCSN>. |
| | | */ |
| | | public Map<DN, List<CSN>> getOfflineReplicas() |
| | | { |
| | | return offlineReplicas; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "domainToGenerationId=" + domainToGenerationId |
| | | + ", domainToServerIds=" + domainToServerIds; |
| | | + ", domainToServerIds=" + domainToServerIds |
| | | + ", offlineReplicas=" + offlineReplicas; |
| | | } |
| | | } |
| | |
| | | * value received, and forwarding the message to the other RSes. |
| | | * @param senderHandler The handler for the server that sent the heartbeat. |
| | | * @param msg The message to process. |
| | | * @throws DirectoryException |
| | | * if a problem occurs |
| | | */ |
| | | void processChangeTimeHeartbeatMsg(ServerHandler senderHandler, |
| | | ChangeTimeHeartbeatMsg msg) |
| | | ChangeTimeHeartbeatMsg msg) throws DirectoryException |
| | | { |
| | | domainDB.replicaHeartbeat(baseDN, msg.getCSN()); |
| | | try |
| | | { |
| | | if (msg.isReplicaOfflineMsg()) |
| | | { |
| | | domainDB.replicaOffline(baseDN, msg.getCSN()); |
| | | } |
| | | else |
| | | { |
| | | domainDB.replicaHeartbeat(baseDN, msg.getCSN()); |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e |
| | | .getMessageObject(), e); |
| | | } |
| | | |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | /* |
| | |
| | | /** |
| | | * Processes a change time heartbeat msg. |
| | | * |
| | | * @param msg The message to be processed. |
| | | * @param msg |
| | | * The message to be processed. |
| | | * @throws DirectoryException |
| | | * When an exception is raised. |
| | | */ |
| | | void process(ChangeTimeHeartbeatMsg msg) |
| | | void process(ChangeTimeHeartbeatMsg msg) throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " |
| | | + replicationServerDomain.getLocalRSMonitorInstanceName() + " " |
| | | + this + " processes received msg:\n" + msg); |
| | | } |
| | | replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg); |
| | | } |
| | | |
| | |
| | | // lets update the LDAP server with out current window size and hope |
| | | // that everything will work better in the future. |
| | | // TODO also log an error message. |
| | | WindowMsg msg = new WindowMsg(rcvWindow); |
| | | session.publish(msg); |
| | | } else |
| | | session.publish(new WindowMsg(rcvWindow)); |
| | | } |
| | | else |
| | | { |
| | | // Both the LDAP server and the replication server believes that the |
| | | // window is closed. Lets check the flowcontrol in case we |
| | |
| | | * the replication domain baseDN |
| | | * @param offlineCSN |
| | | * The CSN (serverId and timestamp) for the replica's going offline |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | void replicaOffline(DN baseDN, CSN offlineCSN); |
| | | void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException; |
| | | } |
| | |
| | | |
| | | /** |
| | | * Holds the last time each replica was seen alive, whether via updates or |
| | | * heartbeats received. Data is held for each serverId cross domain. |
| | | * heartbeat notifications, or offline notifications. Data is held for each |
| | | * serverId cross domain. |
| | | * <p> |
| | | * Updates are persistent and stored in the replicaDBs, heartbeats are |
| | | * transient and are easily constructed on normal operations. |
| | |
| | | * @return true if the provided baseDN is enabled for the external changelog, |
| | | * false if the provided baseDN is disabled for the external changelog |
| | | * or unknown to multimaster replication. |
| | | * @see MultimasterReplication#isECLEnabledDomain(DN) |
| | | */ |
| | | protected boolean isECLEnabledDomain(DN baseDN) |
| | | { |
| | |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas() |
| | | .entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | | final List<CSN> offlineCSNs = entry.getValue(); |
| | | for (CSN offlineCSN : offlineCSNs) |
| | | { |
| | | if (isECLEnabledDomain(baseDN)) |
| | | { |
| | | replicasOffline.update(baseDN, offlineCSN); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // this will not be used any more. Discard for garbage collection. |
| | | this.changelogState = null; |
| | | } |
| | |
| | | } |
| | | |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, |
| | | final DN mcBaseDN) |
| | | final DN mcBaseDN) throws ChangelogException |
| | | { |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(mcBaseDN, mcCSN); |
| | | mediumConsistency = Pair.of(mcBaseDN, mcCSN); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId()); |
| | | if (offlineCSN != null |
| | | && offlineCSN.isOlderThan(mcCSN) |
| | | // If no new updates has been seen for this replica |
| | | && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN)) |
| | | final int mcServerId = mcCSN.getServerId(); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); |
| | | final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); |
| | | if (offlineCSN != null) |
| | | { |
| | | removeCursor(mcBaseDN, mcCSN); |
| | | replicasOffline.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN)) |
| | | { |
| | | // replica is back online, we can forget the last time it was offline |
| | | replicasOffline.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | else if (offlineCSN.isOlderThan(mcCSN)) |
| | | { |
| | | /* |
| | | * replica is not back online and Medium consistency point has gone past |
| | | * its last offline time: remove everything known about it: cursor, |
| | | * offlineCSN from lastAliveCSN and remove all knowledge of this replica |
| | | * from the medium consistency RUV. |
| | | */ |
| | | removeCursor(mcBaseDN, mcCSN); |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | private void removeCursor(final DN baseDN, final CSN csn) |
| | | throws ChangelogException |
| | | { |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1 |
| | | : allCursors.entrySet()) |
| | |
| | | { |
| | | iter.remove(); |
| | | StaticUtils.close(entry2.getValue()); |
| | | resetNextChangeForInsertDBCursor(); |
| | | return; |
| | | } |
| | | } |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void replicaOffline(DN baseDN, CSN offlineCSN) |
| | | throws ChangelogException |
| | | { |
| | | dbEnv.addOfflineReplica(baseDN, offlineCSN); |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | if (indexer != null) |
| | | { |
| | | indexer.replicaOffline(baseDN, offlineCSN); |
| | | } |
| | | // TODO save this state in the changelogStateDB? |
| | | } |
| | | |
| | | /** |
| | |
| | | // wait a bit before purging more |
| | | return DEFAULT_SLEEP; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initiateShutdown() |
| | | { |
| | | super.initiateShutdown(); |
| | | this.interrupt(); // wake up the purger thread for faster shutdown |
| | | } |
| | | } |
| | | } |
| | |
| | | synchronized (this) |
| | | { |
| | | closeCursor(); |
| | | // previously exhausted cursor must be able to reinitialize themselves |
| | | // Previously exhausted cursor must be able to reinitialize themselves. |
| | | // There is a risk of readLock never being unlocked |
| | | // if following code is called while the cursor is closed. |
| | | // It is better to let the deadlock happen to help quickly identifying |
| | | // and fixing such issue with unit tests. |
| | | cursor = db.openReadCursor(lastNonNullCurrentCSN); |
| | | currentChange = cursor.next(); |
| | | if (currentChange != null) |
| | |
| | | } |
| | | |
| | | /** |
| | | * Called by the Gc when the object is garbage collected Release the internal |
| | | * Called by the Gc when the object is garbage collected. Release the internal |
| | | * cursor in case the cursor was badly used and {@link #close()} was never |
| | | * called. |
| | | */ |
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | |
| | |
| | | private ReplicationServer replicationServer; |
| | | private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); |
| | | private static final String GENERATION_ID_TAG = "GENID"; |
| | | private static final String OFFLINE_TAG = "OFFLINE"; |
| | | private static final String FIELD_SEPARATOR = " "; |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | |
| | | } |
| | | result.setDomainGenerationId(baseDN, generationId); |
| | | } |
| | | else if (prefix.equals(OFFLINE_TAG)) |
| | | { |
| | | final String[] str = stringKey.split(FIELD_SEPARATOR, 3); |
| | | final int serverId = toInt(str[1]); |
| | | final DN baseDN = DN.decode(str[2]); |
| | | long timestamp = ByteString.wrap(entry.getValue()).asReader().getLong(); |
| | | if (debugEnabled()) |
| | | { |
| | | debug("has read replica offline: baseDN=" + baseDN + " serverId=" |
| | | + serverId); |
| | | } |
| | | result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId)); |
| | | } |
| | | else |
| | | { |
| | | final String[] str = stringData.split(FIELD_SEPARATOR, 2); |
| | |
| | | final DN baseDN = DN.decode(str[1]); |
| | | if (debugEnabled()) |
| | | { |
| | | debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId); |
| | | debug("has read replica: baseDN=" + baseDN + " serverId=" |
| | | + serverId); |
| | | } |
| | | result.addServerIdToDomain(serverId, baseDN); |
| | | } |
| | |
| | | // Opens the DB for the changes received from this server on this domain. |
| | | final Database replicaDB = openDatabase(replicaEntry.getKey()); |
| | | |
| | | putInChangelogStateDBIfNotExist(replicaEntry); |
| | | putInChangelogStateDBIfNotExist(toByteArray(replicaEntry)); |
| | | putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId)); |
| | | return replicaDB; |
| | | } |
| | |
| | | * the replica's serverId |
| | | * @return a database entry for the replica |
| | | */ |
| | | Entry<String, String> toReplicaEntry(DN baseDN, int serverId) |
| | | static Entry<String, String> toReplicaEntry(DN baseDN, int serverId) |
| | | { |
| | | final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString(); |
| | | return new SimpleImmutableEntry<String, String>(key, key); |
| | |
| | | * the domain's generationId |
| | | * @return a database entry for the generationId |
| | | */ |
| | | Entry<String, String> toGenIdEntry(DN baseDN, long generationId) |
| | | static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId) |
| | | { |
| | | final String normDn = baseDN.toNormalizedString(); |
| | | final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn; |
| | | final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId |
| | | + FIELD_SEPARATOR + normDn; |
| | | return new SimpleImmutableEntry<String, String>(key, data); |
| | | return new SimpleImmutableEntry<byte[], byte[]>(toBytes(key),toBytes(data)); |
| | | } |
| | | |
| | | private void putInChangelogStateDBIfNotExist(Entry<String, String> entry) |
| | | throws RuntimeException |
| | | /** |
| | | * Converts an Entry<String, String> to an Entry<byte[], byte[]>. |
| | | * |
| | | * @param entry |
| | | * the entry to convert |
| | | * @return the converted entry |
| | | */ |
| | | static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry) |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey())); |
| | | return new SimpleImmutableEntry<byte[], byte[]>( |
| | | toBytes(entry.getKey()), |
| | | toBytes(entry.getValue())); |
| | | } |
| | | |
| | | /** |
| | | * Return an entry to store in the changelog state database representing the |
| | | * time a replica went offline. |
| | | * |
| | | * @param baseDN |
| | | * the replica's baseDN |
| | | * @param offlineCSN |
| | | * the replica's serverId and offline timestamp |
| | | * @return a database entry representing the time a replica went offline |
| | | */ |
| | | static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN) |
| | | { |
| | | final byte[] key = |
| | | toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId() |
| | | + FIELD_SEPARATOR + baseDN.toNormalizedString()); |
| | | final ByteStringBuilder data = new ByteStringBuilder(8); // store a long |
| | | data.append(offlineCSN.getTime()); |
| | | return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray()); |
| | | } |
| | | |
| | | private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry) |
| | | throws ChangelogException, RuntimeException |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(entry.getKey()); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try |
| | | { |
| | | data.setData(toBytes(entry.getValue())); |
| | | data.setData(entry.getValue()); |
| | | if (debugEnabled()) |
| | | { |
| | | debug("putting record in the changelogstate Db key=[" |
| | | + entry.getKey() + "] value=[" + entry.getValue() + "]"); |
| | | + toString(entry.getKey()) + "] value=[" |
| | | + toString(entry.getValue()) + "]"); |
| | | } |
| | | changelogStateDb.put(txn, key, data); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | |
| | | */ |
| | | public void clearServerId(DN baseDN, int serverId) throws ChangelogException |
| | | { |
| | | deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId), |
| | | deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)), |
| | | "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); |
| | | } |
| | | |
| | | private void deleteFromChangelogStateDB(Entry<String, ?> entry, |
| | | private void deleteFromChangelogStateDB(Entry<byte[], ?> entry, |
| | | String methodInvocation) throws ChangelogException |
| | | { |
| | | if (debugEnabled()) |
| | |
| | | |
| | | try |
| | | { |
| | | final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey())); |
| | | final DatabaseEntry key = new DatabaseEntry(entry.getKey()); |
| | | final DatabaseEntry data = new DatabaseEntry(); |
| | | if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | |
| | | throw dbe; |
| | | } |
| | | } |
| | | else |
| | | else if (debugEnabled()) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " failed: key=[ " + entry.getKey() |
| | | + "] not found"); |
| | | } |
| | | debug(methodInvocation + " failed: key not found"); |
| | | } |
| | | } |
| | | catch (RuntimeException dbe) |
| | | catch (RuntimeException e) |
| | | { |
| | | throw new ChangelogException(dbe); |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " error: " + stackTraceToSingleLineString(e)); |
| | | } |
| | | throw new ChangelogException(e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Add the information about an offline replica to the changelog state DB. |
| | | * |
| | | * @param baseDN |
| | | * the domain of the offline replica |
| | | * @param offlineCSN |
| | | * the offline replica serverId and offline timestamp |
| | | * @throws ChangelogException |
| | | * if a database problem occurred |
| | | */ |
| | | public void addOfflineReplica(DN baseDN, CSN offlineCSN) |
| | | throws ChangelogException |
| | | { |
| | | // just overwrite any older entry as it is assumed a newly received offline |
| | | // CSN is newer than the previous one |
| | | putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN), |
| | | "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")"); |
| | | } |
| | | |
| | | private void putInChangelogStateDB(Entry<byte[], byte[]> entry, |
| | | String methodInvocation) throws ChangelogException |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " starting"); |
| | | } |
| | | |
| | | try |
| | | { |
| | | final DatabaseEntry key = new DatabaseEntry(entry.getKey()); |
| | | final DatabaseEntry data = new DatabaseEntry(entry.getValue()); |
| | | changelogStateDb.put(null, key, data); |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " succeeded"); |
| | | } |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | debug(methodInvocation + " error: " + stackTraceToSingleLineString(e)); |
| | | } |
| | | throw new ChangelogException(e); |
| | | } |
| | | } |
| | | |
| | |
| | | * |
| | | * |
| | | * Copyright 2009 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | |
| | | import org.opends.server.replication.protocol.Session; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | |
| | | * @param session The session on which heartbeats are to be sent. |
| | | * @param heartbeatInterval The interval between heartbeats sent |
| | | * (in milliseconds). |
| | | * @param serverId2 The serverId of the sender domain. |
| | | * @param serverId The serverId of the sender domain. |
| | | */ |
| | | public CTHeartbeatPublisherThread(String threadName, Session session, |
| | | long heartbeatInterval, int serverId2) |
| | | long heartbeatInterval, int serverId) |
| | | { |
| | | super(threadName); |
| | | this.session = session; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.serverId = serverId2; |
| | | this.serverId = serverId; |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public void run() |
| | | { |
| | | long lastHeartbeatTime = 0; |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | |
| | | |
| | | while (!shutdown) |
| | | { |
| | | long now = System.currentTimeMillis(); |
| | | final CSN csn = new CSN(TimeThread.getTime(), 0, serverId); |
| | | ChangeTimeHeartbeatMsg ctHeartbeatMsg = new ChangeTimeHeartbeatMsg(csn); |
| | | |
| | | final long now = System.currentTimeMillis(); |
| | | if (now > session.getLastPublishTime() + heartbeatInterval) |
| | | { |
| | | session.publish(ctHeartbeatMsg); |
| | | final CSN csn = new CSN(now, 0, serverId); |
| | | session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn)); |
| | | lastHeartbeatTime = csn.getTime(); |
| | | } |
| | | |
| | | long sleepTime = session.getLastPublishTime() + heartbeatInterval - now; |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (shutdown) |
| | | { |
| | | /* |
| | | * Shortcoming: this thread is restarted each time the DS reconnects, |
| | | * e.g. during load balancing. This is not that much of a problem |
| | | * because the ChangeNumberIndexer tolerates receiving replica offline |
| | | * heartbeats and then receiving messages back again. |
| | | */ |
| | | /* |
| | | * However, during shutdown we need to be sure that all pending client |
| | | * operations have either completed or have been aborted before shutting |
| | | * down replication. Otherwise, the medium consistency will move forward |
| | | * without knowing about these changes. |
| | | */ |
| | | final long now = System.currentTimeMillis(); |
| | | final int seqNum = lastHeartbeatTime == now ? 1 : 0; |
| | | final CSN offlineCSN = new CSN(now, seqNum, serverId); |
| | | session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN)); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | | import java.util.Collection; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.testng.Assert; |
| | | import org.testng.annotations.Test; |
| | | |
| | | /** |
| | | * Test for {@link ByteStringBuilder} and {@link ByteArrayScanner} classes. |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | public class ByteArrayTest extends DirectoryServerTestCase |
| | | { |
| | | |
| | | @Test |
| | | public void testBuilderAppendMethodsAndScannerNextMethods() throws Exception |
| | | { |
| | | final boolean bo = true; |
| | | final byte by = 80; |
| | | final short sh = 42; |
| | | final int i = sh + 1; |
| | | final long l = i + 1; |
| | | final String st = "Yay!"; |
| | | final Collection<String> col = Arrays.asList("foo", "bar", "baz"); |
| | | final CSN csn = new CSN(42424242, 13, 42); |
| | | |
| | | byte[] bytes = new ByteArrayBuilder() |
| | | .append(bo) |
| | | .append(by) |
| | | .append(sh) |
| | | .append(i) |
| | | .append(l) |
| | | .append(st) |
| | | .appendStrings(col) |
| | | .appendUTF8(i) |
| | | .appendUTF8(l) |
| | | .append(csn) |
| | | .appendUTF8(csn) |
| | | .toByteArray(); |
| | | |
| | | final ByteArrayScanner scanner = new ByteArrayScanner(bytes); |
| | | Assert.assertEquals(scanner.nextBoolean(), bo); |
| | | Assert.assertEquals(scanner.nextByte(), by); |
| | | Assert.assertEquals(scanner.nextShort(), sh); |
| | | Assert.assertEquals(scanner.nextInt(), i); |
| | | Assert.assertEquals(scanner.nextLong(), l); |
| | | Assert.assertEquals(scanner.nextString(), st); |
| | | Assert.assertEquals(scanner.nextStrings(new ArrayList<String>()), col); |
| | | Assert.assertEquals(scanner.nextIntUTF8(), i); |
| | | Assert.assertEquals(scanner.nextLongUTF8(), l); |
| | | Assert.assertEquals(scanner.nextCSN(), csn); |
| | | Assert.assertEquals(scanner.nextCSNUTF8(), csn); |
| | | Assert.assertTrue(scanner.isEmpty()); |
| | | } |
| | | } |
| | |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.replication.protocol.OperationContext.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | /** |
| | |
| | | List<Modification> mods4 = new ArrayList<Modification>(); |
| | | for (int i = 0; i < 10; i++) |
| | | { |
| | | Attribute attr = Attributes.create("description", "string" |
| | | + String.valueOf(i)); |
| | | Modification mod = new Modification(ModificationType.ADD, attr); |
| | | mods4.add(mod); |
| | | Attribute attr = Attributes.create("description", "string" + i); |
| | | mods4.add(new Modification(ModificationType.ADD, attr)); |
| | | } |
| | | |
| | | Attribute attr5 = Attributes.create("namingcontexts", TEST_ROOT_DN_STRING); |
| | |
| | | msg.setAssuredMode(assuredMode); |
| | | msg.setSafeDataLevel(safeDataLevel); |
| | | |
| | | // Set ECL entry inlcuded attributes |
| | | // Set ECL entry included attributes |
| | | if (entryAttrList != null) |
| | | { |
| | | msg.setEclIncludes(entryAttrList); |
| | | } |
| | | |
| | | ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg( |
| | | msg.getBytes(), ProtocolVersion.getCurrentVersion()); |
| | | msg.getBytes(), getCurrentVersion()); |
| | | |
| | | // Test that generated attributes match original attributes. |
| | | assertEquals(generatedMsg.isAssured(), isAssured); |
| | |
| | | |
| | | // Check equals |
| | | ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg( |
| | | msg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1); |
| | | msg.getBytes(), REPLICATION_PROTOCOL_V1); |
| | | assertFalse(msg.equals(null)); |
| | | assertFalse(msg.equals(new Object())); |
| | | |
| | |
| | | } |
| | | msg.setInitiatorsName("johnny h"); |
| | | DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg.generateMsg( |
| | | msg.getBytes(), ProtocolVersion.getCurrentVersion()); |
| | | msg.getBytes(), getCurrentVersion()); |
| | | |
| | | assertEquals(msg.toString(), generatedMsg.toString()); |
| | | assertEquals(msg.getInitiatorsName(), generatedMsg.getInitiatorsName()); |
| | |
| | | msg.setEclIncludes(entryAttrList); |
| | | } |
| | | |
| | | ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg |
| | | .generateMsg(msg.getBytes(), ProtocolVersion.getCurrentVersion()); |
| | | ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg.generateMsg( |
| | | msg.getBytes(), getCurrentVersion()); |
| | | |
| | | // Test that generated attributes match original attributes. |
| | | assertEquals(generatedMsg.isAssured(), isAssured); |
| | |
| | | msg.setEclIncludes(entryAttrList); |
| | | } |
| | | |
| | | AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(msg |
| | | .getBytes(), ProtocolVersion.getCurrentVersion()); |
| | | AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg( |
| | | msg.getBytes(), getCurrentVersion()); |
| | | assertEquals(generatedMsg.getBytes(), msg.getBytes()); |
| | | assertEquals(generatedMsg.toString(), msg.toString()); |
| | | |
| | |
| | | new StopMsg(msg.getBytes(getCurrentVersion())); |
| | | } |
| | | |
| | | @Test |
| | | public void changeTimeHeartbeatMsgTest() throws Exception |
| | | { |
| | | final short v1 = REPLICATION_PROTOCOL_V1; |
| | | final short v7 = REPLICATION_PROTOCOL_V7; |
| | | final short v8 = REPLICATION_PROTOCOL_V8; |
| | | |
| | | final CSN csn = new CSN(System.currentTimeMillis(), 0, 42); |
| | | final ChangeTimeHeartbeatMsg heartbeatMsg = ChangeTimeHeartbeatMsg.heartbeatMsg(csn); |
| | | assertCTHearbeatMsg(heartbeatMsg, v1, false); |
| | | assertCTHearbeatMsg(heartbeatMsg, v7, false); |
| | | assertCTHearbeatMsg(heartbeatMsg, v8, false); |
| | | |
| | | final ChangeTimeHeartbeatMsg offlineMsg = ChangeTimeHeartbeatMsg.replicaOfflineMsg(csn); |
| | | assertCTHearbeatMsg(offlineMsg, v1, false); |
| | | assertCTHearbeatMsg(offlineMsg, v7, false); |
| | | assertCTHearbeatMsg(offlineMsg, v8, true); |
| | | } |
| | | |
| | | private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg heartbeatMsg, |
| | | short version, boolean expected) throws DataFormatException |
| | | { |
| | | final byte[] bytes = heartbeatMsg.getBytes(version); |
| | | ChangeTimeHeartbeatMsg decodedMsg = new ChangeTimeHeartbeatMsg(bytes, version); |
| | | assertEquals(decodedMsg.isReplicaOfflineMsg(), expected); |
| | | } |
| | | |
| | | /** |
| | | * Test that WindowMsg encoding and decoding works |
| | | * by checking that : msg == new WindowMsg(msg.getBytes()). |
| | |
| | | throws Exception |
| | | { |
| | | TopologyMsg msg = new TopologyMsg(dsList, rsList); |
| | | TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()), |
| | | ProtocolVersion.getCurrentVersion()); |
| | | TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()), getCurrentVersion()); |
| | | assertEquals(msg.getReplicaInfos(), newMsg.getReplicaInfos()); |
| | | assertEquals(msg.getRsInfos(), newMsg.getRsInfos()); |
| | | } |
| | |
| | | msg.setServerState(sid3, s3, now+3, false); |
| | | |
| | | byte[] b = msg.getBytes(getCurrentVersion()); |
| | | MonitorMsg newMsg = new MonitorMsg(b, ProtocolVersion.getCurrentVersion()); |
| | | MonitorMsg newMsg = new MonitorMsg(b, getCurrentVersion()); |
| | | |
| | | assertEquals(rsState, msg.getReplServerDbState()); |
| | | assertEquals(newMsg.getReplServerDbState().toString(), |
| | | msg.getReplServerDbState().toString()); |
| | | |
| | | Iterator<Integer> it = newMsg.ldapIterator(); |
| | | while (it.hasNext()) |
| | | for (int sid : toIterable(newMsg.ldapIterator())) |
| | | { |
| | | int sid = it.next(); |
| | | ServerState s = newMsg.getLDAPServerState(sid); |
| | | if (sid == sid1) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | Iterator<Integer> it2 = newMsg.rsIterator(); |
| | | while (it2.hasNext()) |
| | | for (int sid : toIterable(newMsg.rsIterator())) |
| | | { |
| | | int sid = it2.next(); |
| | | ServerState s = newMsg.getRSServerState(sid); |
| | | if (sid == sid3) |
| | | { |
| | |
| | | encodemsg += (t4 - t31); |
| | | |
| | | // getBytes |
| | | byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion()); |
| | | byte[] bytes = generatedMsg.getBytes(getCurrentVersion()); |
| | | t5 = System.nanoTime(); |
| | | getbytes += (t5 - t4); |
| | | |
| | |
| | | encodemsg += (t4 - t31); |
| | | |
| | | // getBytes |
| | | byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion()); |
| | | byte[] bytes = generatedMsg.getBytes(getCurrentVersion()); |
| | | t5 = System.nanoTime(); |
| | | getbytes += (t5 - t4); |
| | | |
| | |
| | | encodemsg += (t4 - t31); |
| | | |
| | | // getBytes |
| | | byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion()); |
| | | byte[] bytes = generatedMsg.getBytes(getCurrentVersion()); |
| | | t5 = System.nanoTime(); |
| | | getbytes += (t5 - t4); |
| | | |
| | |
| | | |
| | | final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4); |
| | | publishUpdateMsg(msg4); |
| | | // MCP moved forward after receiving update from serverId1 |
| | | // MCP moves forward after receiving update from serverId1 |
| | | // (last replica in the domain) |
| | | assertExternalChangelogContent(msg1, msg2, msg4); |
| | | |
| | | // serverId2 comes online again |
| | | final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5); |
| | | publishUpdateMsg(msg5); |
| | | // MCP does not move until it knows what happens to serverId1 |
| | | assertExternalChangelogContent(msg1, msg2, msg4); |
| | | sendHeartbeat(BASE_DN1, serverId1, 6); |
| | | // MCP moves forward |
| | | assertExternalChangelogContent(msg1, msg2, msg4, msg5); |
| | | } |
| | | |
| | | private void addReplica(DN baseDN, int serverId) throws Exception |
| | |
| | | waitForWaitingState(cnIndexer); |
| | | } |
| | | |
| | | private void stopCNIndexer() |
| | | private void stopCNIndexer() throws Exception |
| | | { |
| | | if (cnIndexer != null) |
| | | { |
| | | cnIndexer.initiateShutdown(); |
| | | cnIndexer.interrupt(); |
| | | cnIndexer.join(); |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.DN; |
| | |
| | | import com.sleepycat.je.Environment; |
| | | |
| | | import static java.util.Arrays.*; |
| | | import static java.util.Collections.*; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.je.ReplicationDbEnv.*; |
| | |
| | | public class ReplicationDbEnvTest extends DirectoryServerTestCase |
| | | { |
| | | |
| | | /** |
| | | * Bypass heavyweight setup. |
| | | */ |
| | | private final class TestableReplicationDbEnv extends ReplicationDbEnv |
| | | { |
| | | private TestableReplicationDbEnv() throws ChangelogException |
| | | { |
| | | super(null, null); |
| | | } |
| | | /** |
| | | * Bypass heavyweight setup. |
| | | */ |
| | | private final class TestableReplicationDbEnv extends ReplicationDbEnv |
| | | { |
| | | private TestableReplicationDbEnv() throws ChangelogException |
| | | { |
| | | super(null, null); |
| | | } |
| | | |
| | | @Override |
| | | protected Environment openJEEnvironment(String path) |
| | | { |
| | | return null; |
| | | } |
| | | @Override |
| | | protected Environment openJEEnvironment(String path) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | @Override |
| | | protected Database openDatabase(String databaseName) |
| | | throws ChangelogException, RuntimeException |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | | @Override |
| | | protected Database openDatabase(String databaseName) |
| | | throws ChangelogException, RuntimeException |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | @BeforeClass |
| | | public void setup() throws Exception |
| | | { |
| | | TestCaseUtils.startFakeServer(); |
| | | } |
| | | @BeforeClass |
| | | public void setup() throws Exception |
| | | { |
| | | TestCaseUtils.startFakeServer(); |
| | | } |
| | | |
| | | @AfterClass |
| | | public void teardown() |
| | | { |
| | | TestCaseUtils.shutdownFakeServer(); |
| | | } |
| | | @AfterClass |
| | | public void teardown() |
| | | { |
| | | TestCaseUtils.shutdownFakeServer(); |
| | | } |
| | | |
| | | @DataProvider |
| | | public Object[][] changelogStateDataProvider() throws Exception |
| | | { |
| | | return new Object[][] { |
| | | { DN.decode("dc=example,dc=com"), 524157415, asList(42, 346) }, |
| | | // test with a space in the baseDN (space is the field separator in the DB) |
| | | { DN.decode("cn=admin data"), 524157415, asList(42, 346) }, |
| | | }; |
| | | } |
| | | @DataProvider |
| | | public Object[][] changelogStateDataProvider() throws Exception |
| | | { |
| | | final int genId = 524157415; |
| | | final int id1 = 42; |
| | | final int id2 = 346; |
| | | final int t1 = 1956245524; |
| | | return new Object[][] { |
| | | { DN.decode("dc=example,dc=com"), genId, EMPTY_LIST, EMPTY_LIST }, |
| | | { DN.decode("dc=example,dc=com"), genId, asList(id1, id2), |
| | | asList(new CSN(id2, 0, t1)) }, |
| | | // test with a space in the baseDN (space is the field separator in the |
| | | // DB) |
| | | { DN.decode("cn=admin data"), genId, asList(id1, id2), EMPTY_LIST }, }; |
| | | } |
| | | |
| | | @Test(dataProvider = "changelogStateDataProvider") |
| | | public void encodeDecodeChangelogState(DN baseDN, long generationId, |
| | | List<Integer> serverIds) throws Exception |
| | | { |
| | | final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv(); |
| | | @Test(dataProvider = "changelogStateDataProvider") |
| | | public void encodeDecodeChangelogState(DN baseDN, long generationId, |
| | | List<Integer> replicas, List<CSN> offlineReplicas) throws Exception |
| | | { |
| | | final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv(); |
| | | |
| | | // encode data |
| | | final Map<byte[], byte[]> wholeState = new LinkedHashMap<byte[], byte[]>(); |
| | | put(wholeState, changelogStateDB.toGenIdEntry(baseDN, generationId)); |
| | | for (Integer serverId : serverIds) |
| | | { |
| | | put(wholeState, changelogStateDB.toReplicaEntry(baseDN, serverId)); |
| | | } |
| | | // encode data |
| | | final Map<byte[], byte[]> wholeState = new LinkedHashMap<byte[], byte[]>(); |
| | | put(wholeState, toGenIdEntry(baseDN, generationId)); |
| | | for (Integer serverId : replicas) |
| | | { |
| | | put(wholeState, toByteArray(toReplicaEntry(baseDN, serverId))); |
| | | } |
| | | for (CSN offlineCSN : offlineReplicas) |
| | | { |
| | | put(wholeState, toReplicaOfflineEntry(baseDN, offlineCSN)); |
| | | } |
| | | |
| | | // decode data |
| | | final ChangelogState state = |
| | | changelogStateDB.decodeChangelogState(wholeState); |
| | | assertThat(state.getDomainToGenerationId()).containsExactly( |
| | | entry(baseDN, generationId)); |
| | | assertThat(state.getDomainToServerIds()).containsExactly( |
| | | entry(baseDN, serverIds)); |
| | | } |
| | | // decode data |
| | | final ChangelogState state = |
| | | changelogStateDB.decodeChangelogState(wholeState); |
| | | assertThat(state.getDomainToGenerationId()).containsExactly( |
| | | entry(baseDN, generationId)); |
| | | if (!replicas.isEmpty()) |
| | | { |
| | | assertThat(state.getDomainToServerIds()).containsExactly( |
| | | entry(baseDN, replicas)); |
| | | } |
| | | else |
| | | { |
| | | assertThat(state.getDomainToServerIds()).isEmpty(); |
| | | } |
| | | if (!offlineReplicas.isEmpty()) |
| | | { |
| | | assertThat(state.getOfflineReplicas()).containsExactly( |
| | | entry(baseDN, offlineReplicas)); |
| | | } |
| | | else |
| | | { |
| | | assertThat(state.getOfflineReplicas()).isEmpty(); |
| | | } |
| | | } |
| | | |
| | | private void put(Map<byte[], byte[]> map, Entry<String, String> entry) |
| | | { |
| | | map.put(toBytes(entry.getKey()), toBytes(entry.getValue())); |
| | | } |
| | | private void put(Map<byte[], byte[]> map, Entry<byte[], byte[]> entry) |
| | | { |
| | | map.put(entry.getKey(), entry.getValue()); |
| | | } |
| | | |
| | | } |