| | |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.ModifyDNOperationBasis; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPControl; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.service.ReplicationBroker; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | |
| | | /** |
| | | * Tests for the replicationServer code. |
| | | */ |
| | | |
| | | public class ReplicationServerTest extends ReplicationTestCase |
| | | { |
| | | // The tracer object for the debug logger |
| | |
| | | server1.publish(msg); |
| | | ReplicationMsg msg2 = server2.receive(); |
| | | server2.updateWindowAfterReplay(); |
| | | if (msg2 instanceof DeleteMsg) |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | | assertTrue(del.toString().equals(msg.toString()), |
| | | "ReplicationServer basic : incorrect message body received."); |
| | | } |
| | | else |
| | | fail("ReplicationServer basic : incorrect message type received: " + |
| | | msg2.getClass().toString() + ": content: " + msg2.toString()); |
| | | assertDeleteMsgBodyEquals(msg, msg2); |
| | | |
| | | /* |
| | | * Send and receive a second Delete Msg |
| | |
| | | server1.publish(msg); |
| | | msg2 = server2.receive(); |
| | | server2.updateWindowAfterReplay(); |
| | | if (msg2 instanceof DeleteMsg) |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | | assertTrue(del.toString().equals(msg.toString()), |
| | | "ReplicationServer basic : incorrect message body received."); |
| | | } |
| | | else |
| | | fail("ReplicationServer basic : incorrect message type received: " + |
| | | msg2.getClass().toString() + ": content: " + msg2.toString()); |
| | | assertDeleteMsgBodyEquals(msg, msg2); |
| | | |
| | | /* |
| | | * Send and receive a Delete Msg from server 2 to server 1 |
| | |
| | | server2.publish(msg); |
| | | msg2 = server1.receive(); |
| | | server1.updateWindowAfterReplay(); |
| | | if (msg2 instanceof DeleteMsg) |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | | assertTrue(del.toString().equals(msg.toString()), |
| | | "ReplicationServer basic : incorrect message body received."); |
| | | } |
| | | else |
| | | fail("ReplicationServer basic : incorrect message type received: " + |
| | | msg2.getClass().toString() + ": content: " + msg2.toString()); |
| | | assertDeleteMsgBodyEquals(msg, msg2); |
| | | |
| | | /* |
| | | * Send and receive a second Delete Msg |
| | |
| | | server2.publish(msg); |
| | | msg2 = server1.receive(); |
| | | server1.updateWindowAfterReplay(); |
| | | if (msg2 instanceof DeleteMsg) |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | | assertTrue(del.toString().equals(msg.toString()), |
| | | "ReplicationServer basic : incorrect message body received."); |
| | | } |
| | | else |
| | | fail("ReplicationServer basic : incorrect message type received: " + |
| | | msg2.getClass().toString() + ": content: " + msg2.toString()); |
| | | assertDeleteMsgBodyEquals(msg, msg2); |
| | | |
| | | debugInfo("Ending changelogBasic"); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private void assertDeleteMsgBodyEquals(DeleteMsg msg, ReplicationMsg msg2) |
| | | { |
| | | if (msg2 instanceof DeleteMsg) |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | | assertEquals(del.toString(), msg.toString(), |
| | | "ReplicationServer basic : incorrect message body received."); |
| | | } |
| | | else |
| | | fail("ReplicationServer basic : incorrect message type received: " + |
| | | msg2.getClass().toString() + ": content: " + msg2.toString()); |
| | | } |
| | | |
| | | /** |
| | | * Test that a new client see the change that was sent in the |
| | | * previous test. |
| | |
| | | |
| | | ReplicationMsg msg2 = broker.receive(); |
| | | broker.updateWindowAfterReplay(); |
| | | if (!(msg2 instanceof DeleteMsg)) |
| | | fail("ReplicationServer basic transmission failed:" + msg2); |
| | | else |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | | assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1), |
| | | "The first message received by a new client was the wrong one : " |
| | | + del.getChangeNumber() + " instead of " + firstChangeNumberServer1); |
| | | } |
| | | assertDeleteMsgChangeNumberEquals(msg2, firstChangeNumberServer1, |
| | | "first"); |
| | | debugInfo("Ending newClient"); |
| | | } |
| | | finally |
| | |
| | | |
| | | ReplicationMsg msg2 = broker.receive(); |
| | | broker.updateWindowAfterReplay(); |
| | | if (!(msg2 instanceof DeleteMsg)) |
| | | { |
| | | fail("ReplicationServer basic transmission failed:" + msg2); |
| | | } |
| | | else |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg2; |
| | | assertTrue(del.getChangeNumber().equals(nextChangeNumber), |
| | | "The second message received by a new client was the wrong one." |
| | | + del.getChangeNumber() + " " + nextChangeNumber); |
| | | } |
| | | assertDeleteMsgChangeNumberEquals(msg2, nextChangeNumber, "second"); |
| | | } |
| | | finally |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Asserts that the change number for the passed in message matches the |
| | | * supplied change number. |
| | | * |
| | | * @param msg |
| | | * @param nextChangeNumber |
| | | * @param msgNumber |
| | | */ |
| | | private void assertDeleteMsgChangeNumberEquals(ReplicationMsg msg, |
| | | ChangeNumber nextChangeNumber, String msgNumber) |
| | | { |
| | | if (msg instanceof DeleteMsg) |
| | | { |
| | | DeleteMsg del = (DeleteMsg) msg; |
| | | assertEquals(del.getChangeNumber(), nextChangeNumber, "The " + msgNumber |
| | | + " message received by a new client was the wrong one."); |
| | | } |
| | | else |
| | | { |
| | | fail("ReplicationServer basic transmission failed:" + msg); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test that a client that has already seen the first change now see the |
| | | * second change |
| | | */ |
| | |
| | | client[i] = new BrokerReader(clientBroker[i], TOTAL_MSG); |
| | | } |
| | | |
| | | for (int i =0; i< CLIENT_THREADS; i++) |
| | | for (BrokerReader c : client) |
| | | { |
| | | client[i].start(); |
| | | c.start(); |
| | | } |
| | | reader.start(); |
| | | |
| | |
| | | { |
| | | server.stop(); |
| | | } |
| | | for (int i =0; i< CLIENT_THREADS; i++) |
| | | for (BrokerReader c : client) |
| | | { |
| | | if (client[i] != null) |
| | | if (c != null) |
| | | { |
| | | client[i].join(10000); |
| | | client[i].interrupt(); |
| | | c.join(10000); |
| | | c.interrupt(); |
| | | } |
| | | |
| | | } |
| | | for (int i =0; i< CLIENT_THREADS; i++) |
| | | for (ReplicationBroker broker : clientBroker) |
| | | { |
| | | if (clientBroker[i] != null) |
| | | clientBroker[i].stop(); |
| | | if (broker != null) |
| | | broker.stop(); |
| | | } |
| | | |
| | | assertTrue(reader.errDetails==null, |
| | | reader.exc + " " + reader.errDetails); |
| | | assertNull(reader.errDetails, reader.exc + " " + reader.errDetails); |
| | | } |
| | | } |
| | | |
| | |
| | | reader[i] = new BrokerReader(broker[i], (TOTAL_MSG/THREADS)*(THREADS-1)); |
| | | } |
| | | |
| | | for (int i =0; i< THREADS; i++) |
| | | for (BrokerWriter p : producer) |
| | | { |
| | | producer[i].start(); |
| | | p.start(); |
| | | } |
| | | |
| | | for (int i =0; i< THREADS; i++) |
| | | for (BrokerReader r : reader) |
| | | { |
| | | reader[i].start(); |
| | | r.start(); |
| | | } |
| | | debugInfo("multipleWriterMultipleReader produces and readers started"); |
| | | } |
| | | finally |
| | | { |
| | | debugInfo("multipleWriterMultipleReader wait producers end"); |
| | | for (int i = 0; i< THREADS; i++) |
| | | for (BrokerWriter p : producer) |
| | | { |
| | | if (producer[i] != null) |
| | | if (p != null) |
| | | { |
| | | producer[i].join(10000); |
| | | p.join(10000); |
| | | // kill the thread in case it is not yet stopped. |
| | | producer[i].interrupt(); |
| | | p.interrupt(); |
| | | } |
| | | } |
| | | debugInfo("multipleWriterMultipleReader producers ended, now wait readers end"); |
| | | for (int i = 0; i< THREADS; i++) |
| | | for (BrokerReader r : reader) |
| | | { |
| | | if (reader[i] != null) |
| | | if (r != null) |
| | | { |
| | | reader[i].join(10000); |
| | | r.join(10000); |
| | | // kill the thread in case it is not yet stopped. |
| | | reader[i].interrupt(); |
| | | r.interrupt(); |
| | | } |
| | | } |
| | | debugInfo("multipleWriterMultipleReader reader's ended, now stop brokers"); |
| | | for (int i = 0; i< THREADS; i++) |
| | | for (ReplicationBroker b : broker) |
| | | { |
| | | if (broker[i] != null) |
| | | broker[i].stop(); |
| | | if (b != null) |
| | | b.stop(); |
| | | } |
| | | debugInfo("multipleWriterMultipleReader brokers stopped"); |
| | | |
| | | for (int i = 0; i< THREADS; i++) |
| | | for (BrokerReader r : reader) |
| | | { |
| | | if (reader[i] != null) |
| | | assertTrue(reader[i].errDetails==null, |
| | | reader[i].exc + " " + reader[i].errDetails); |
| | | if (r != null) |
| | | assertNull(r.errDetails, r.exc + " " + r.errDetails); |
| | | } |
| | | } |
| | | debugInfo("Ending multipleWriterMultipleReader"); |
| | |
| | | } |
| | | } |
| | | // Check that everything expected has been received |
| | | assertTrue(ts == 1, "Broker2 did not receive the complete set of" |
| | | assertEquals(ts, 1, "Broker2 did not receive the complete set of" |
| | | + " expected messages: #msg received " + ts); |
| | | debugInfo("Ending changelogChaining"); |
| | | } |
| | |
| | | |
| | | replicationServer.clearDb(); |
| | | |
| | | LDIFWriter ldifWriter = null; |
| | | ByteArrayOutputStream stream = new ByteArrayOutputStream(); |
| | | LDIFExportConfig exportConfig = new LDIFExportConfig(stream); |
| | | try |
| | | { |
| | | ldifWriter = new LDIFWriter(exportConfig); |
| | | } |
| | | catch (Exception e){} |
| | | LDIFWriter ldifWriter = new LDIFWriter(exportConfig); |
| | | |
| | | debugInfo("Create broker"); |
| | | |
| | |
| | | DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, filter, null, null); |
| | | internalSearch.run(); |
| | | assertTrue(internalSearch.getResultCode() == ResultCode.SUCCESS); |
| | | assertEquals(internalSearch.getResultCode(), ResultCode.SUCCESS); |
| | | assertTrue(internalSearch.getSearchEntries().isEmpty()); |
| | | |
| | | // General search |
| | |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(changetype=add)")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | assertTrue(op.getSearchEntries().size() == 2); |
| | | assertEquals(op.getSearchEntries().size(), 2); |
| | | |
| | | op = connection.processSearch( |
| | | ByteString.valueOf("dc=replicationChanges"), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(changetype=modify)")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | assertTrue(op.getSearchEntries().size() == 1); |
| | | assertEquals(op.getSearchEntries().size(), 1); |
| | | |
| | | op = connection.processSearch( |
| | | ByteString.valueOf("dc=replicationChanges"), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(changetype=moddn)")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | assertTrue(op.getSearchEntries().size() == 1); |
| | | assertEquals(op.getSearchEntries().size(), 1); |
| | | |
| | | op = connection.processSearch( |
| | | ByteString.valueOf("dc=replicationChanges"), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | LDAPFilter.decode("(changetype=delete)")); |
| | | assertEquals(op.getResultCode(), ResultCode.SUCCESS); |
| | | assertTrue(op.getSearchEntries().size() == 1); |
| | | assertEquals(op.getSearchEntries().size(), 1); |
| | | |
| | | debugInfo("Query / filter based on objectclass"); |
| | | op = connection.processSearch( |
| | |
| | | } |
| | | } |
| | | // Check that everything expected has been received |
| | | assertTrue(ts == 1, "Broker2 did not receive the complete set of" |
| | | assertEquals(ts, 1, "Broker2 did not receive the complete set of" |
| | | + " expected messages: #msg received " + ts); |
| | | |
| | | // Then change the config to remove replicationServer[1] from |