Code cleanup:
- hid ReplicationDomain internals
- moved test code out of production code
- removed duplicated test code
ReplicationDomain.java:
In ctor, delegate to other ctor.
In startPublishService(), removed ReplicationDomainCfg parameter and get the config from field instead.
Removed getConfig(), not used anymore.
Moved publish(byte[]) to ReplicationDomainTest.
LDAPReplicationDomain.java, AssuredReplicationServerTest.java, FakeStressReplicationDomain.java:
Consequence of the change to ReplicationDomain.startPublishService().
FakeReplicationDomain.java:
Expanded the code to support what FractionalReplicationTest.FakeReplicationDomain was doing.
Consequence of the change to ReplicationDomain.startPublishService().
FractionalReplicationTest.java:
Removed FakeReplicationDomain inner class, replaced by org.opends.server.replication.service.FakeReplicationDomain.
Removed newConfig(), now unused.
ReplicationDomainTest.java:
Moved ReplicationDomain.publish(byte[]) here.
Extracted method publishRepeatedly().
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*; |
| | | import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy; |
| | | import org.opends.server.admin.std.server.ExternalChangelogDomainCfg; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.api.AlertGenerator; |
| | |
| | | import org.opends.server.util.LDIFReader; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | import org.opends.server.workflowelement.localbackend.*; |
| | | import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.messages.ToolMessages.*; |
| | |
| | | // register as an AlertGenerator |
| | | DirectoryServer.registerAlertGenerator(this); |
| | | |
| | | startPublishService(configuration); |
| | | startPublishService(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * The startup phase of the ReplicationDomain subclass, |
| | | * should read the list of replication servers from the configuration, |
| | | * instantiate a {@link ServerState} then start the publish service |
| | | * by calling {@link #startPublishService(ReplicationDomainCfg)}. |
| | | * by calling {@link #startPublishService()}. |
| | | * At this point it can start calling the {@link #publish(UpdateMsg)} |
| | | * method if needed. |
| | | * <p> |
| | |
| | | */ |
| | | public ReplicationDomain(ReplicationDomainCfg config, long generationId) |
| | | { |
| | | this.config = config; |
| | | this.assuredConfig = config; |
| | | this.generationId = generationId; |
| | | this.state = new ServerState(); |
| | | this.generator = new CSNGenerator(getServerId(), state); |
| | | this(config, generationId, new ServerState()); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the current config of this ReplicationDomain. |
| | | * |
| | | * @return the config |
| | | */ |
| | | protected ReplicationDomainCfg getConfig() |
| | | { |
| | | return config; |
| | | } |
| | | |
| | | /** |
| | | * Returns the base DN of this ReplicationDomain. All Replication Domain using |
| | | * this baseDN will be connected through the Replication Service. |
| | | * |
| | |
| | | * has been called, the publish service can be used by calling the |
| | | * {@link #publish(UpdateMsg)} method. |
| | | * |
| | | * @param config |
| | | * The configuration that should be used. |
| | | * @throws ConfigException |
| | | * If the DirectoryServer configuration was incorrect. |
| | | */ |
| | | public void startPublishService(ReplicationDomainCfg config) |
| | | throws ConfigException |
| | | public void startPublishService() throws ConfigException |
| | | { |
| | | synchronized (sessionLock) |
| | | { |
| | |
| | | * calling the {@link #processUpdate(UpdateMsg)}. |
| | | * <p> |
| | | * This method must be called once and must be called after the |
| | | * {@link #startPublishService(ReplicationDomainCfg)}. |
| | | * {@link #startPublishService()}. |
| | | */ |
| | | public void startListenService() |
| | | { |
| | |
| | | * <p> |
| | | * The Replication Service will restart from the point indicated by the |
| | | * {@link ServerState} that was given as a parameter to the |
| | | * {@link #startPublishService(ReplicationDomainCfg)} at startup time. |
| | | * {@link #startPublishService()} at startup time. |
| | | * <p> |
| | | * If some data have changed in the repository during the period of time when |
| | | * the Replication Service was disabled, this {@link ServerState} should |
| | |
| | | } |
| | | |
| | | /** |
| | | * Publish information to the Replication Service (not assured mode). |
| | | * |
| | | * @param msg The byte array containing the information that should |
| | | * be sent to the remote entities. |
| | | */ |
| | | void publish(byte[] msg) |
| | | { |
| | | UpdateMsg update; |
| | | synchronized (this) |
| | | { |
| | | update = new UpdateMsg(generator.newCSN(), msg); |
| | | /* |
| | | If assured replication is configured, this will prepare blocking |
| | | mechanism. If assured replication is disabled, this returns immediately |
| | | */ |
| | | prepareWaitForAckIfAssuredEnabled(update); |
| | | |
| | | publish(update); |
| | | } |
| | | |
| | | try |
| | | { |
| | | /* |
| | | If assured replication is enabled, this will wait for the matching ack or |
| | | time out. If assured replication is disabled, this returns immediately |
| | | */ |
| | | waitForAckIfAssuredEnabled(update); |
| | | } catch (TimeoutException ex) |
| | | { |
| | | // This exception may only be raised if assured replication is enabled |
| | | logError(NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(), |
| | | Long.toString(getAssuredTimeout()), update.toString())); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This method should return the generationID to use for this |
| | | * ReplicationDomain. |
| | | * This method can be called at any time after the ReplicationDomain |
| | |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | | import java.util.*; |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.ModifyDNMsg; |
| | | import org.opends.server.replication.protocol.ModifyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.service.FakeReplicationDomain; |
| | | import org.opends.server.replication.service.ReplicationDomain; |
| | | import org.opends.server.types.*; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | |
| | | private static final String ENTRY_UUID3 = |
| | | "33333333-3333-3333-3333-333333333333"; |
| | | /** Dn of the manipulated entry */ |
| | | private static String ENTRY_DN = "uid=1," + TEST_ROOT_DN_STRING; |
| | | private static final String ENTRY_DN = "uid=1," + TEST_ROOT_DN_STRING; |
| | | |
| | | /** |
| | | * Optional attribute not part of concerned attributes of the fractional |
| | |
| | | /** Second test backend */ |
| | | private static final String TEST2_ROOT_DN_STRING = "dc=example,dc=com"; |
| | | private static final String TEST2_ORG_DN_STRING = "o=test2," + TEST2_ROOT_DN_STRING; |
| | | private static String ENTRY_DN2 = "uid=1," + TEST2_ORG_DN_STRING; |
| | | private static final String ENTRY_DN2 = "uid=1," + TEST2_ORG_DN_STRING; |
| | | |
| | | private void debugInfo(String s) { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, s)); |
| | |
| | | replicationServer = new ReplicationServer(conf); |
| | | } |
| | | |
| | | private static DomainFakeCfg newConfig(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, long heartbeatInterval) |
| | | { |
| | | DomainFakeCfg fakeCfg = |
| | | new DomainFakeCfg(baseDN, serverID, replicationServers); |
| | | fakeCfg.setHeartbeatInterval(heartbeatInterval); |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | return fakeCfg; |
| | | } |
| | | |
| | | /** |
| | | * This class is the minimum implementation of a Concrete ReplicationDomain |
| | | * used to be able to connect to the RS with a known genid. Also to be able |
| | | * to send updates |
| | | */ |
| | | private class FakeReplicationDomain extends ReplicationDomain |
| | | { |
| | | /** |
| | | * A blocking queue that is used to receive updates from the Replication |
| | | * Service. |
| | | */ |
| | | private BlockingQueue<UpdateMsg> queue = |
| | | new LinkedBlockingQueue<UpdateMsg>(); |
| | | |
| | | /** A string that will be exported should exportBackend be called. */ |
| | | private String exportString; |
| | | |
| | | /** |
| | | * A StringBuilder that will be used to build a new String should the import |
| | | * be called. |
| | | */ |
| | | private StringBuilder importString; |
| | | private int exportedEntryCount; |
| | | |
| | | public FakeReplicationDomain(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, long heartbeatInterval, |
| | | long generationId) throws ConfigException |
| | | { |
| | | super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval), |
| | | generationId); |
| | | startPublishService(getConfig()); |
| | | startListenService(); |
| | | } |
| | | |
| | | public void initExport(String exportString, int exportedEntryCount) |
| | | { |
| | | this.exportString = exportString; |
| | | this.exportedEntryCount = exportedEntryCount; |
| | | } |
| | | |
| | | @Override |
| | | public long countEntries() throws DirectoryException |
| | | { |
| | | return exportedEntryCount; |
| | | } |
| | | |
| | | @Override |
| | | protected void exportBackend(OutputStream output) throws DirectoryException |
| | | { |
| | | try |
| | | { |
| | | output.write(exportString.getBytes()); |
| | | output.flush(); |
| | | output.close(); |
| | | } catch (IOException e) |
| | | { |
| | | throw new DirectoryException(ResultCode.OPERATIONS_ERROR, |
| | | ERR_BACKEND_EXPORT_ENTRY.get("", "")); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | protected void importBackend(InputStream input) throws DirectoryException |
| | | { |
| | | byte[] buffer = new byte[1000]; |
| | | |
| | | int ret; |
| | | do |
| | | { |
| | | try |
| | | { |
| | | ret = input.read(buffer, 0, 1000); |
| | | } catch (IOException e) |
| | | { |
| | | throw new DirectoryException( |
| | | ResultCode.OPERATIONS_ERROR, |
| | | ERR_BACKEND_EXPORT_ENTRY.get("", "")); |
| | | } |
| | | importString.append(new String(buffer, 0, ret)); |
| | | } while (ret >= 0); |
| | | } |
| | | |
| | | @Override |
| | | public boolean processUpdate(UpdateMsg updateMsg) |
| | | { |
| | | if (queue != null) |
| | | queue.add(updateMsg); |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | private static final String REPLICATION_GENERATION_ID = |
| | | "ds-sync-generation-id"; |
| | | private static final Task NO_INIT_TASK = null; |
| | |
| | | private static final int OTHER_GID_BIS = 3; |
| | | |
| | | /** Default generation id */ |
| | | private static long DEFAULT_GENID = EMPTY_DN_GENID; |
| | | private static final long DEFAULT_GENID = EMPTY_DN_GENID; |
| | | /** Other generation id */ |
| | | private static long OTHER_GENID = 500L; |
| | | private static final long OTHER_GENID = 500L; |
| | | |
| | | /* |
| | | * Definitions for the scenario of the fake DS |
| | |
| | | FakeReplicationDomain fakeReplicationDomain = |
| | | new FakeReplicationDomain(config, generationId, scenario, serverState); |
| | | |
| | | fakeReplicationDomain.startPublishService(config); |
| | | fakeReplicationDomain.startPublishService(); |
| | | if (startListen) |
| | | fakeReplicationDomain.startListenService(); |
| | | |
| | |
| | | * According to the configured scenario, it will answer to updates with acks |
| | | * as the scenario is requesting. |
| | | */ |
| | | public class FakeReplicationDomain extends ReplicationDomain |
| | | private class FakeReplicationDomain extends ReplicationDomain |
| | | { |
| | | /** The scenario this DS is expecting */ |
| | | private int scenario = -1; |
| | | private final int scenario; |
| | | |
| | | private CSNGenerator gen; |
| | | private final CSNGenerator gen; |
| | | |
| | | /** False if a received update had assured parameters not as expected */ |
| | | private boolean everyUpdatesAreOk = true; |
| | | /** Number of received updates */ |
| | | private int nReceivedUpdates = 0; |
| | | |
| | | private int nWrongReceivedUpdates = 0; |
| | | |
| | | /** |
| | |
| | | * behavior upon reception of updates) |
| | | * @throws org.opends.server.config.ConfigException |
| | | */ |
| | | public FakeReplicationDomain(ReplicationDomainCfg config, |
| | | private FakeReplicationDomain(ReplicationDomainCfg config, |
| | | long generationId, int scenario, ServerState serverState) |
| | | throws ConfigException |
| | | { |
| | |
| | | gen = new CSNGenerator(config.getServerId(), 0L); |
| | | } |
| | | |
| | | public boolean receivedUpdatesOk() |
| | | private boolean receivedUpdatesOk() |
| | | { |
| | | return everyUpdatesAreOk; |
| | | } |
| | |
| | | * Sends a new update from this DS |
| | | * @throws TimeoutException If timeout waiting for an assured ack |
| | | */ |
| | | public void sendNewFakeUpdate() throws TimeoutException |
| | | private void sendNewFakeUpdate() throws TimeoutException |
| | | { |
| | | // Create a new delete update message (the simplest to create) |
| | | DeleteMsg delMsg = new DeleteMsg(getBaseDN(), gen.newCSN(), UUID.randomUUID().toString()); |
| | |
| | | private String exportString; |
| | | |
| | | /** |
| | | * A StringBuilder that will be used to build a build a new String should the |
| | | * import be called. |
| | | * A StringBuilder that will be used to build a new String should the import |
| | | * be called. |
| | | */ |
| | | private StringBuilder importString; |
| | | |
| | | private int exportedEntryCount; |
| | | |
| | | private FakeReplicationDomain(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, int window, long heartbeatInterval) |
| | | throws ConfigException |
| | | SortedSet<String> replicationServers, int window, long heartbeatInterval, |
| | | long generationId) throws ConfigException |
| | | { |
| | | super(newConfig(baseDN, serverID, replicationServers, window, |
| | | heartbeatInterval), 1); |
| | | startPublishService(getConfig()); |
| | | super(newConfig(baseDN, serverID, replicationServers, window, heartbeatInterval), generationId); |
| | | startPublishService(); |
| | | startListenService(); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | public FakeReplicationDomain(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, long heartbeatInterval, |
| | | long generationId) throws ConfigException |
| | | { |
| | | this(baseDN, serverID, replicationServers, 100, heartbeatInterval, generationId); |
| | | } |
| | | |
| | | FakeReplicationDomain(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, int window, long heartbeatInterval, |
| | | BlockingQueue<UpdateMsg> queue) throws ConfigException |
| | | { |
| | | this(baseDN, serverID, replicationServers, window, heartbeatInterval); |
| | | this(baseDN, serverID, replicationServers, window, heartbeatInterval, 1); |
| | | this.queue = queue; |
| | | } |
| | | |
| | | public FakeReplicationDomain(DN baseDN, int serverID, |
| | | FakeReplicationDomain(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, long heartbeatInterval, |
| | | String exportString, StringBuilder importString, int exportedEntryCount) |
| | | throws ConfigException |
| | | { |
| | | this(baseDN, serverID, replicationServers, 100, heartbeatInterval); |
| | | this(baseDN, serverID, replicationServers, 100, heartbeatInterval, 1); |
| | | this.exportString = exportString; |
| | | this.importString = importString; |
| | | this.exportedEntryCount = exportedEntryCount; |
| | | } |
| | | |
| | | public void initExport(String exportString, int exportedEntryCount) |
| | | { |
| | | this.exportString = exportString; |
| | | this.exportedEntryCount = exportedEntryCount; |
| | | } |
| | | |
| | | @Override |
| | | public long countEntries() throws DirectoryException |
| | | { |
| | |
| | | * used to test the Generic Replication Service. |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | public class FakeStressReplicationDomain extends ReplicationDomain |
| | | class FakeStressReplicationDomain extends ReplicationDomain |
| | | { |
| | | /** |
| | | * A blocking queue that is used to send the UpdateMsg received from the |
| | | * Replication Service. |
| | | */ |
| | | private BlockingQueue<UpdateMsg> queue; |
| | | private final BlockingQueue<UpdateMsg> queue; |
| | | |
| | | public FakeStressReplicationDomain(DN baseDN, int serverID, |
| | | FakeStressReplicationDomain(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, long heartbeatInterval, |
| | | BlockingQueue<UpdateMsg> queue) throws ConfigException |
| | | { |
| | | super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval), 1); |
| | | startPublishService(getConfig()); |
| | | startPublishService(); |
| | | startListenService(); |
| | | this.queue = queue; |
| | | } |
| | |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.TimeoutException; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.backends.task.Task; |
| | |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.service.ReplicationDomain.*; |
| | | import org.opends.server.replication.service.ReplicationDomain.ImportExportContext; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | /** |
| | |
| | | * Check that domain2 receives it shortly after. |
| | | */ |
| | | byte[] test = {1, 2, 3 ,4, 0, 1, 2, 3, 4, 5}; |
| | | domain1.publish(test); |
| | | publish(domain1, test); |
| | | |
| | | UpdateMsg rcvdMsg = rcvQueue2.poll(20, TimeUnit.SECONDS); |
| | | assertNotNull(rcvdMsg); |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Publish information to the Replication Service (not assured mode). |
| | | * |
| | | * @param msg The byte array containing the information that should |
| | | * be sent to the remote entities. |
| | | */ |
| | | void publish(FakeReplicationDomain domain, byte[] msg) |
| | | { |
| | | UpdateMsg updateMsg; |
| | | synchronized (this) |
| | | { |
| | | updateMsg = new UpdateMsg(domain.getGenerator().newCSN(), msg); |
| | | // If assured replication is configured, |
| | | // this will prepare blocking mechanism. |
| | | // If assured replication is disabled, this returns immediately |
| | | domain.prepareWaitForAckIfAssuredEnabled(updateMsg); |
| | | domain.publish(updateMsg); |
| | | } |
| | | |
| | | try |
| | | { |
| | | // If assured replication is enabled, |
| | | // this will wait for the matching ack or time out. |
| | | // If assured replication is disabled, this returns immediately |
| | | domain.waitForAckIfAssuredEnabled(updateMsg); |
| | | } |
| | | catch (TimeoutException ex) |
| | | { |
| | | // This exception may only be raised if assured replication is enabled |
| | | logError(NOTE_DS_ACK_TIMEOUT.get(domain.getBaseDNString(), Long |
| | | .toString(domain.getAssuredTimeout()), updateMsg.toString())); |
| | | } |
| | | } |
| | | |
| | | private void assertExpectedServerStatuses(Map<Integer, DSInfo> dsInfos, |
| | | int domain1ServerId, int domain2ServerId) |
| | | { |
| | |
| | | */ |
| | | byte[] test = {1, 2, 3 ,4, 0, 1, 2, 3, 4, 5}; |
| | | |
| | | long timeStart = System.nanoTime(); |
| | | for (int i=0; i< 100000; i++) |
| | | domain1.publish(test); |
| | | long timeNow = System.nanoTime(); |
| | | System.out.println(timeNow - timeStart); |
| | | |
| | | timeStart = timeNow; |
| | | for (int i=0; i< 100000; i++) |
| | | domain1.publish(test); |
| | | timeNow = System.nanoTime(); |
| | | System.out.println(timeNow - timeStart); |
| | | |
| | | timeStart = timeNow; |
| | | for (int i=0; i< 100000; i++) |
| | | domain1.publish(test); |
| | | timeNow = System.nanoTime(); |
| | | System.out.println(timeNow - timeStart); |
| | | |
| | | timeStart = timeNow; |
| | | for (int i=0; i< 100000; i++) |
| | | domain1.publish(test); |
| | | timeNow = System.nanoTime(); |
| | | System.out.println(timeNow - timeStart); |
| | | timeNow = publishRepeatedly(domain1, test, timeNow); |
| | | timeNow = publishRepeatedly(domain1, test, timeNow); |
| | | timeNow = publishRepeatedly(domain1, test, timeNow); |
| | | timeNow = publishRepeatedly(domain1, test, timeNow); |
| | | } |
| | | finally |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private long publishRepeatedly(FakeReplicationDomain domain1, byte[] test, long timeNow) |
| | | { |
| | | long timeStart = timeNow; |
| | | for (int i = 0; i < 100000; i++) |
| | | { |
| | | publish(domain1, test); |
| | | } |
| | | timeNow = System.nanoTime(); |
| | | System.out.println(timeNow - timeStart); |
| | | return timeNow; |
| | | } |
| | | |
| | | private ReplicationServer createReplicationServer(int serverId, |
| | | int replicationPort, String dirName, int windowSize, |
| | | String... replServers) throws Exception |