OPENDJ-1116 Introduce abstraction for the changelog DB
After talking Matthew's over the phone review, did a few changes described down below.
*.java:
Changed changeNumber from int to long throughout the replication code base.
ECLUpdateMsg.java:
In getBytes(), did not complete the change to long because it would require a protocol version change.
ReplicaDBCursor.java:
Now extends Comparable.
ReplicaDBCursorComparator.java: REMOVED
Code has been moved to JEReplicaDBCursor.compareTo().
ChangeNumberIndexDB.java:
Does not extend Runnable anymore (mistake from earlier refactorings).
DraftCNDbHandler.java:
Now extends Runnable.
1 files deleted
17 files modified
| | |
| | | */ |
| | | package org.opends.server.replication.common; |
| | | |
| | | import static org.opends.messages.ExtensionMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | |
| | | import static org.opends.messages.ExtensionMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | /** |
| | | * This class implements a virtual attribute provider that allows administrators |
| | |
| | | extends VirtualAttributeProvider<UserDefinedVirtualAttributeCfg> |
| | | implements ConfigurationChangeListener<UserDefinedVirtualAttributeCfg> |
| | | { |
| | | // The tracer object for the debug logger. |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | |
| | | @Override() |
| | | public Set<AttributeValue> getValues(Entry entry,VirtualAttributeRule rule) |
| | | { |
| | | String first="0"; |
| | | String first = "0"; |
| | | try |
| | | { |
| | | ECLWorkflowElement eclwe = (ECLWorkflowElement) |
| | |
| | | |
| | | ReplicationServer rs = eclwe.getReplicationServer(); |
| | | rs.disableEligibility(excludedDomains); |
| | | int[] limits = rs.getECLChangeNumberLimits( |
| | | long[] limits = rs.getECLChangeNumberLimits( |
| | | rs.getEligibleCSN(), excludedDomains); |
| | | |
| | | first = String.valueOf(limits[0]); |
| | |
| | | */ |
| | | package org.opends.server.replication.common; |
| | | |
| | | import static org.opends.messages.ExtensionMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | import java.util.Collections; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | |
| | | import static org.opends.messages.ExtensionMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | /** |
| | | * This class implements a virtual attribute provider that allows administrators |
| | |
| | | extends VirtualAttributeProvider<UserDefinedVirtualAttributeCfg> |
| | | implements ConfigurationChangeListener<UserDefinedVirtualAttributeCfg> |
| | | { |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * Creates a new instance of this member virtual attribute provider. |
| | | */ |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | |
| | | |
| | | ReplicationServer rs = eclwe.getReplicationServer(); |
| | | rs.disableEligibility(excludedDomains); |
| | | int[] limits = rs.getECLChangeNumberLimits( |
| | | long[] limits = rs.getECLChangeNumberLimits( |
| | | rs.getEligibleCSN(), excludedDomains); |
| | | |
| | | last = String.valueOf(limits[1]); |
| | |
| | | private MultiDomainServerState cookie; |
| | | |
| | | /** The changeNumber as specified by draft-good-ldap-changelog. */ |
| | | private int changeNumber; |
| | | private long changeNumber; |
| | | |
| | | /** |
| | | * Creates a new message. |
| | |
| | | { |
| | | byte[] byteCookie = String.valueOf(cookie).getBytes("UTF-8"); |
| | | byte[] byteBaseDN = String.valueOf(baseDN).getBytes("UTF-8"); |
| | | byte[] byteChangeNumber = Integer.toString(changeNumber).getBytes("UTF-8"); |
| | | // FIXME JNR Changing line below to use long would require a protocol |
| | | // version change. Leave it like this for now until the need arises. |
| | | byte[] byteChangeNumber = |
| | | Integer.toString((int) changeNumber).getBytes("UTF-8"); |
| | | byte[] byteUpdateMsg = updateMsg.getBytes(protocolVersion); |
| | | |
| | | int length = 1 + byteCookie.length + |
| | |
| | | * Setter for the changeNumber of this change. |
| | | * @param changeNumber the provided changeNumber for this change. |
| | | */ |
| | | public void setChangeNumber(int changeNumber) |
| | | public void setChangeNumber(long changeNumber) |
| | | { |
| | | this.changeNumber = changeNumber; |
| | | } |
| | |
| | | * Getter for the changeNumber of this change. |
| | | * @return the changeNumber of this change. |
| | | */ |
| | | public int getChangeNumber() |
| | | public long getChangeNumber() |
| | | { |
| | | return this.changeNumber; |
| | | } |
| | |
| | | return null; |
| | | } |
| | | |
| | | final int firstChangeNumber = cnIndexDB.getFirstChangeNumber(); |
| | | final long firstChangeNumber = cnIndexDB.getFirstChangeNumber(); |
| | | final String crossDomainStartState = |
| | | cnIndexDB.getPreviousCookie(firstChangeNumber); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber); |
| | |
| | | * Get the draftLimits (from the eligibleCSN got at the beginning of the |
| | | * operation) in order to have the first and possible last change number. |
| | | */ |
| | | final int[] limits = replicationServer.getECLChangeNumberLimits( |
| | | final long[] limits = replicationServer.getECLChangeNumberLimits( |
| | | eligibleCSN, excludedBaseDNs); |
| | | final int firstChangeNumber = limits[0]; |
| | | final int lastChangeNumber = limits[1]; |
| | | final long firstChangeNumber = limits[0]; |
| | | final long lastChangeNumber = limits[1]; |
| | | |
| | | // If the startChangeNumber provided is lower than the firstChangeNumber in |
| | | // the DB, let's use the lower limit. |
| | |
| | | return null; |
| | | } |
| | | |
| | | final int lastKey = cnIndexDB.getLastChangeNumber(); |
| | | final long lastKey = cnIndexDB.getLastChangeNumber(); |
| | | crossDomainStartState = cnIndexDB.getPreviousCookie(lastKey); |
| | | cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey); |
| | | return crossDomainStartState; |
| | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; |
| | | import org.opends.server.types.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | private NavigableSet<ReplicaDBCursor> collectAllCursorsWithChanges() |
| | | { |
| | | final NavigableSet<ReplicaDBCursor> results = |
| | | new TreeSet<ReplicaDBCursor>(new ReplicaDBCursorComparator()); |
| | | new TreeSet<ReplicaDBCursor>(); |
| | | for (int serverId : replicationServerDomain.getServerIds()) |
| | | { |
| | | // get the last already sent CN from that server to get a cursor |
| | |
| | | * <p> |
| | | * Guarded by cnIndexDBLock |
| | | **/ |
| | | private int lastGeneratedChangeNumber = 0; |
| | | private long lastGeneratedChangeNumber = 0; |
| | | |
| | | /** Used for protecting {@link ChangeNumberIndexDB} related state. */ |
| | | private final Object cnIndexDBLock = new Object(); |
| | |
| | | * |
| | | * @return the first value. |
| | | */ |
| | | public int getFirstChangeNumber() |
| | | public long getFirstChangeNumber() |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | |
| | | * |
| | | * @return the last value. |
| | | */ |
| | | public int getLastChangeNumber() |
| | | public long getLastChangeNumber() |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | |
| | | * |
| | | * @return The generated change number |
| | | */ |
| | | public int getNewChangeNumber() |
| | | public long getNewChangeNumber() |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | |
| | | * @throws DirectoryException |
| | | * When it happens. |
| | | */ |
| | | public int[] getECLChangeNumberLimits(CSN crossDomainEligibleCSN, |
| | | public long[] getECLChangeNumberLimits(CSN crossDomainEligibleCSN, |
| | | Set<String> excludedBaseDNs) throws DirectoryException |
| | | { |
| | | /* The content of the DraftCNdb depends on the SEARCH operations done before |
| | |
| | | * (this diff is done domain by domain) |
| | | */ |
| | | |
| | | int lastChangeNumber; |
| | | long lastChangeNumber; |
| | | boolean dbEmpty = false; |
| | | final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB(); |
| | | |
| | | int firstChangeNumber = cnIndexDB.getFirstChangeNumber(); |
| | | long firstChangeNumber = cnIndexDB.getFirstChangeNumber(); |
| | | Map<String, ServerState> domainsServerStateForLastCN = null; |
| | | CSN csnForLastCN = null; |
| | | String domainForLastCN = null; |
| | |
| | | firstChangeNumber += lastGeneratedChangeNumber; |
| | | lastChangeNumber += lastGeneratedChangeNumber; |
| | | } |
| | | return new int[]{firstChangeNumber, lastChangeNumber}; |
| | | return new long[] { firstChangeNumber, lastChangeNumber }; |
| | | } |
| | | |
| | | /** |
| | |
| | | * @see <a href= "http://tools.ietf.org/html/draft-good-ldap-changelog-04" |
| | | * >OpenDJ Domain Names</a> for more information about the changeNumber. |
| | | */ |
| | | public interface ChangeNumberIndexDB extends Runnable |
| | | public interface ChangeNumberIndexDB |
| | | { |
| | | |
| | | /** |
| | |
| | | * the provided change number. |
| | | * @return the associated CSN, null when none. |
| | | */ |
| | | public CSN getCSN(int changeNumber); |
| | | public CSN getCSN(long changeNumber); |
| | | |
| | | /** |
| | | * Get the baseDN associated to a provided change number. |
| | |
| | | * the provided change number. |
| | | * @return the baseDN, null when none. |
| | | */ |
| | | public String getBaseDN(int changeNumber); |
| | | public String getBaseDN(long changeNumber); |
| | | |
| | | /** |
| | | * Get the previous cookie associated to a provided change number. |
| | |
| | | * the provided change number. |
| | | * @return the previous cookie, null when none. |
| | | */ |
| | | String getPreviousCookie(int changeNumber); |
| | | String getPreviousCookie(long changeNumber); |
| | | |
| | | /** |
| | | * Get the first change number stored in this DB. |
| | | * |
| | | * @return Returns the first change number in this DB. |
| | | */ |
| | | int getFirstChangeNumber(); |
| | | long getFirstChangeNumber(); |
| | | |
| | | /** |
| | | * Get the last change number stored in this DB. |
| | | * |
| | | * @return Returns the last change number in this DB |
| | | */ |
| | | int getLastChangeNumber(); |
| | | long getLastChangeNumber(); |
| | | |
| | | /** |
| | | * Add an update to the list of messages that must be saved to this DB managed |
| | |
| | | * @param csn |
| | | * The associated replication CSN. |
| | | */ |
| | | void add(int changeNumber, String previousCookie, String baseDN, CSN csn); |
| | | void add(long changeNumber, String previousCookie, String baseDN, CSN csn); |
| | | |
| | | /** |
| | | * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the |
| | |
| | | * @throws ChangelogException |
| | | * if a database problem happened. |
| | | */ |
| | | ChangeNumberIndexDBCursor getCursorFrom(int startChangeNumber) |
| | | ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber) |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | |
| | | * |
| | | * @return The change number field. |
| | | */ |
| | | int getChangeNumber(); |
| | | long getChangeNumber(); |
| | | |
| | | /** |
| | | * Skip to the next record of the database. |
| | |
| | | * This cursor allows to iterate through the changes received from a given |
| | | * replica (Directory Server) in the topology. |
| | | */ |
| | | public interface ReplicaDBCursor extends Closeable |
| | | public interface ReplicaDBCursor extends Closeable, Comparable<ReplicaDBCursor> |
| | | { |
| | | |
| | | /** |
| | |
| | | * @param csn the provided replication CSN to be |
| | | * stored associated with this change number. |
| | | */ |
| | | public void addEntry(int changeNumber, String value, String domainBaseDN, |
| | | public void addEntry(long changeNumber, String value, String domainBaseDN, |
| | | CSN csn) |
| | | { |
| | | try |
| | |
| | | * creation. |
| | | * @return The ReplServerDBCursor. |
| | | */ |
| | | public DraftCNDBCursor openReadCursor(int changeNumber) |
| | | public DraftCNDBCursor openReadCursor(long changeNumber) |
| | | throws ChangelogException |
| | | { |
| | | return new DraftCNDBCursor(changeNumber); |
| | |
| | | * @throws ChangelogException |
| | | * when the startChangeNumber does not exist. |
| | | */ |
| | | private DraftCNDBCursor(int startChangeNumber) throws ChangelogException |
| | | private DraftCNDBCursor(long startChangeNumber) throws ChangelogException |
| | | { |
| | | this.key = new ReplicationDraftCNKey(startChangeNumber); |
| | | this.entry = new DatabaseEntry(); |
| | |
| | | // We could not move the cursor to the expected startChangeNumber |
| | | if (localCursor.getSearchKeyRange(key, entry, DEFAULT) != SUCCESS) |
| | | { |
| | | // We could not even move the cursor closed to it => failure |
| | | // We could not even move the cursor close to it => failure |
| | | throw new ChangelogException( |
| | | Message.raw("ChangeLog Change Number " + startChangeNumber |
| | | + " is not available")); |
| | |
| | | * This class publishes some monitoring information below <code> |
| | | * cn=monitor</code>. |
| | | */ |
| | | public class DraftCNDbHandler implements ChangeNumberIndexDB |
| | | public class DraftCNDbHandler implements ChangeNumberIndexDB, Runnable |
| | | { |
| | | /** |
| | | * The tracer object for the debug logger. |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public synchronized void add(int changeNumber, String previousCookie, |
| | | public synchronized void add(long changeNumber, String previousCookie, |
| | | String baseDN, CSN csn) |
| | | { |
| | | db.addEntry(changeNumber, previousCookie, baseDN, csn); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int getFirstChangeNumber() |
| | | public long getFirstChangeNumber() |
| | | { |
| | | return db.readFirstChangeNumber(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int getLastChangeNumber() |
| | | public long getLastChangeNumber() |
| | | { |
| | | return db.readLastChangeNumber(); |
| | | } |
| | |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | DraftCNDBCursor getReadCursor(int startChangeNumber) |
| | | DraftCNDBCursor getReadCursor(long startChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | return db.openReadCursor(startChangeNumber); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ChangeNumberIndexDBCursor getCursorFrom(int startChangeNumber) |
| | | public ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | return new DraftCNDbIterator(db, startChangeNumber); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getPreviousCookie(int changeNumber) |
| | | public String getPreviousCookie(long changeNumber) |
| | | { |
| | | DraftCNDBCursor cursor = null; |
| | | try |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public CSN getCSN(int changeNumber) |
| | | public CSN getCSN(long changeNumber) |
| | | { |
| | | DraftCNDBCursor cursor = null; |
| | | try |
| | |
| | | |
| | | /**{@inheritDoc}*/ |
| | | @Override |
| | | public String getBaseDN(int changeNumber) |
| | | public String getBaseDN(long changeNumber) |
| | | { |
| | | DraftCNDBCursor cursor = null; |
| | | try |
| | |
| | | } |
| | | } |
| | | |
| | | private void debugException(String methodName, int changeNumber, Exception e) |
| | | private void debugException(String methodName, long changeNumber, Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In DraftCNDbHandler." + methodName + "(), read: " |
| | |
| | | * @throws ChangelogException |
| | | * If a database problem happened. |
| | | */ |
| | | public DraftCNDbIterator(DraftCNDB db, int startChangeNumber) |
| | | public DraftCNDbIterator(DraftCNDB db, long startChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | draftCNDbCursor = db.openReadCursor(startChangeNumber); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int getChangeNumber() |
| | | public long getChangeNumber() |
| | | { |
| | | return ((ReplicationDraftCNKey) draftCNDbCursor.getKey()).getChangeNumber(); |
| | | } |
| | |
| | | { |
| | | close(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int compareTo(ReplicaDBCursor o) |
| | | { |
| | | final CSN csn1 = getChange().getCSN(); |
| | | final CSN csn2 = o.getChange().getCSN(); |
| | | |
| | | return CSN.compare(csn1, csn2); |
| | | } |
| | | } |
| | |
| | | * Creates a new ReplicationKey from the given change number. |
| | | * @param changeNumber The change number to use. |
| | | */ |
| | | public ReplicationDraftCNKey(int changeNumber) |
| | | public ReplicationDraftCNKey(long changeNumber) |
| | | { |
| | | try |
| | | { |
| | | String s = String.valueOf(changeNumber); |
| | | int a = 16-s.length(); |
| | | String sscn = "0000000000000000".substring(0, a) + s; |
| | | // Should it use StaticUtils.getBytes() to increase performances? |
| | | setData(sscn.getBytes("UTF-8")); |
| | | } catch (UnsupportedEncodingException e) |
| | | setData(String.format("%016d", changeNumber).getBytes("UTF-8")); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // Should never happens, UTF-8 is always supported |
| | | // TODO : add better logging |
| | |
| | | |
| | | /** |
| | | * Getter for the change number associated with this key. |
| | | * |
| | | * @return the change number associated with this key. |
| | | */ |
| | | public int getChangeNumber() |
| | | public long getChangeNumber() |
| | | { |
| | | return Integer.valueOf(new String(getData())); |
| | | return Long.valueOf(new String(getData())); |
| | | } |
| | | } |
| | |
| | | String clearLDIFchanges, |
| | | String targetUUID, |
| | | List<RawAttribute> includedAttributes, |
| | | int changenumber, |
| | | long changenumber, |
| | | String changetype, |
| | | String changeInitiatorsName) |
| | | throws DirectoryException |
| | |
| | | ReplicationServer rs = eclwe.getReplicationServer(); |
| | | rs.disableEligibility(excludedDomains); |
| | | long t1 = TimeThread.getTime(); |
| | | int[] limits = replicationServer.getECLChangeNumberLimits( |
| | | long[] limits = replicationServer.getECLChangeNumberLimits( |
| | | replicationServer.getEligibleCSN(), excludedDomains); |
| | | assertEquals(limits[1], maxMsg); |
| | | long t2 = TimeThread.getTime(); |
| | |
| | | handler.add(cn3, value3, baseDN3, csn3); |
| | | |
| | | // The ChangeNumber should not get purged |
| | | final int firstChangeNumber = handler.getFirstChangeNumber(); |
| | | final long firstChangeNumber = handler.getFirstChangeNumber(); |
| | | assertEquals(firstChangeNumber, cn1); |
| | | assertEquals(handler.getLastChangeNumber(), cn3); |
| | | |