| | |
| | | |
| | | package org.opends.server.synchronization; |
| | | |
| | | import static org.opends.server.loggers.Error.logError; |
| | | import static org.testng.Assert.assertEquals; |
| | | import static org.testng.Assert.assertNotNull; |
| | | import static org.testng.Assert.assertTrue; |
| | |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ErrorLogCategory; |
| | | import org.opends.server.types.ErrorLogSeverity; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.ModificationType; |
| | |
| | | @Test(enabled=true, groups="slow") |
| | | public void fromServertoBroker() throws Exception |
| | | { |
| | | |
| | | logError(ErrorLogCategory.SYNCHRONIZATION, |
| | | ErrorLogSeverity.NOTICE, |
| | | "Starting Synchronization StressTest : fromServertoBroker" , 1); |
| | | |
| | | final DN baseDn = DN.decode("ou=People,dc=example,dc=com"); |
| | | final int TOTAL_MESSAGES = 1000; |
| | | cleanEntries(); |
| | | |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 3); |
| | | ChangelogBroker broker = openChangelogSession(baseDn, (short) 18); |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | |
| | | try { |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | try |
| | | { |
| | | while (true) |
| | | /* |
| | | * loop receiving update until there is nothing left |
| | | * to make sure that message from previous tests have been consumed. |
| | | */ |
| | | try |
| | | { |
| | | broker.receive(); |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | } |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { } |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | */ |
| | | catch (Exception e) |
| | | { } |
| | | /* |
| | | * Test that operations done on this server are sent to the |
| | | * changelog server and forwarded to our changelog broker session. |
| | | */ |
| | | |
| | | // Create an Entry (add operation) that will be later used in the test. |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The Add Entry operation failed"); |
| | | // Create an Entry (add operation) that will be later used in the test. |
| | | Entry tmp = personEntry.duplicate(); |
| | | AddOperation addOp = new AddOperation(connection, |
| | | InternalClientConnection.nextOperationID(), InternalClientConnection |
| | | .nextMessageID(), null, tmp.getDN(), |
| | | tmp.getObjectClasses(), tmp.getUserAttributes(), |
| | | tmp.getOperationalAttributes()); |
| | | addOp.run(); |
| | | entryList.add(personEntry); |
| | | assertNotNull(DirectoryServer.getEntry(personEntry.getDN()), |
| | | "The Add Entry operation failed"); |
| | | |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | "The received synchronization message is not an ADD msg"); |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | // Check if the client has received the msg |
| | | SynchronizationMessage msg = broker.receive(); |
| | | assertTrue(msg instanceof AddMsg, |
| | | "The received synchronization message is not an ADD msg"); |
| | | AddMsg addMsg = (AddMsg) msg; |
| | | |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | "The received synchronization message is not an ADD msg"); |
| | | Operation receivedOp = addMsg.createOperation(connection); |
| | | assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0, |
| | | "The received synchronization message is not an ADD msg"); |
| | | |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(), |
| | | "The received ADD synchronization message is not for the excepted DN"); |
| | | |
| | | reader = new BrokerReader(broker); |
| | | reader.start(); |
| | | reader = new BrokerReader(broker); |
| | | reader.start(); |
| | | |
| | | long startTime = TimeThread.getTime(); |
| | | int count = TOTAL_MESSAGES; |
| | | long startTime = TimeThread.getTime(); |
| | | int count = TOTAL_MESSAGES; |
| | | |
| | | // Create a number of writer thread that will loop modifying the entry |
| | | List<Thread> writerThreadList = new LinkedList<Thread>(); |
| | | for (int n = 0; n < 1; n++) |
| | | { |
| | | BrokerWriter writer = new BrokerWriter(count); |
| | | writerThreadList.add(writer); |
| | | } |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.start(); |
| | | } |
| | | // wait for all the threads to finish. |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.join(); |
| | | } |
| | | // Create a number of writer thread that will loop modifying the entry |
| | | List<Thread> writerThreadList = new LinkedList<Thread>(); |
| | | for (int n = 0; n < 1; n++) |
| | | { |
| | | BrokerWriter writer = new BrokerWriter(count); |
| | | writerThreadList.add(writer); |
| | | } |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.start(); |
| | | } |
| | | // wait for all the threads to finish. |
| | | for (Thread thread : writerThreadList) |
| | | { |
| | | thread.join(); |
| | | } |
| | | |
| | | long afterSendTime = TimeThread.getTime(); |
| | | long afterSendTime = TimeThread.getTime(); |
| | | |
| | | int rcvCount = reader.getCount(); |
| | | long afterReceiveTime = TimeThread.getTime(); |
| | | int rcvCount = reader.getCount(); |
| | | |
| | | long afterReceiveTime = TimeThread.getTime(); |
| | | |
| | | if (rcvCount != TOTAL_MESSAGES) |
| | | { |
| | | fail("some messages were lost : expected : " +TOTAL_MESSAGES + |
| | | " received : " + rcvCount); |
| | | } |
| | | if (rcvCount != TOTAL_MESSAGES) |
| | | { |
| | | fail("some messages were lost : expected : " +TOTAL_MESSAGES + |
| | | " received : " + rcvCount); |
| | | } |
| | | |
| | | } |
| | | finally { |
| | | DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST); |
| | | broker.stop(); |
| | | DirectoryServer.deregisterMonitorProvider(SYNCHRONIZATION_STRESS_TEST); |
| | | broker.stop(); |
| | | } |
| | | } |
| | | |
| | |
| | | ServerState state = new ServerState(baseDn); |
| | | state.loadState(); |
| | | ChangelogBroker broker = new ChangelogBroker(state, baseDn, |
| | | serverId, 0, 0, 0, 0); |
| | | serverId, 0, 0, 0, 0, 100); |
| | | ArrayList<String> servers = new ArrayList<String>(1); |
| | | servers.add("localhost:8989"); |
| | | broker.start(servers); |
| | |
| | | // We also have a replicated suffix (synchronization domain) |
| | | DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null); |
| | | assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()), |
| | | "Unable to add the syncrhonized server"); |
| | | "Unable to add the synchronized server"); |
| | | entryList.add(synchroServerEntry); |
| | | } |
| | | |
| | |
| | | { |
| | | while (true) |
| | | { |
| | | broker.receive(); |
| | | SynchronizationMessage msg = broker.receive(); |
| | | if (msg == null) |
| | | break; |
| | | count ++; |
| | | } |
| | | } catch (Exception e) { |
| | |
| | | return count; |
| | | try |
| | | { |
| | | this.wait(); |
| | | this.wait(60); |
| | | return count; |
| | | } catch (InterruptedException e) |
| | | { |