mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

fdorson
07.45.2008 87e069074e41b0008cf0b549fce36b7dc4fcae30
2nd try: fix for issue #3317 : Removing replication links requires re-start of the server
and issue #3363 : NullPointerException in ReplicationBroker.java
6 files modified
384 ■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 32 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 30 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 16 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java 104 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 192 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -697,7 +697,6 @@
        }
      } // For late servers
    }
    return bestServer;
  }
@@ -1001,7 +1000,7 @@
    try
    {
      rcvWindow--;
      if (rcvWindow < halfRcvWindow)
      if ((rcvWindow < halfRcvWindow) && (session != null))
      {
        session.publish(new WindowMessage(halfRcvWindow));
        rcvWindow += halfRcvWindow;
@@ -1196,9 +1195,10 @@
    this.maxReceiveQueue = maxReceiveQueue;
    this.maxSendDelay = maxSendDelay;
    this.maxSendQueue = maxSendQueue;
  // TODO : Changing those parameters requires to either restart a new
  // session with the replicationServer or renegociate the parameters that
  // were sent in the ServerStart message
    // For info, a new session with the replicationServer
    // will be recreated in the replication domain
    // to take into account the new configuration.
  }
  /**
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -2248,12 +2248,14 @@
    disabled = true;
    // Stop the listener thread
    listenerThread.shutdown();
    if (listenerThread != null)
      listenerThread.shutdown();
    broker.stop(); // This will cut the session and wake up the listener
    // Wait for the listener thread to stop
    listenerThread.waitForShutdown();
    if (listenerThread != null)
      listenerThread.waitForShutdown();
  }
  /**
@@ -3456,15 +3458,35 @@
    // server id and base dn are readonly.
    // isolationPolicy can be set immediately and will apply
    // to the next updates.
    // The other parameters needs to be renegociated with the ReplicationServer.
    // The other parameters needs to be renegociated with the ReplicationServer
    // so that requires restarting the session with the ReplicationServer.
    replicationServers = configuration.getReplicationServer();
    Boolean needToRestartSession = false;
    Collection<String> newReplServers = configuration.getReplicationServer();
    // A new session is necessary only when information regarding
    // the connection is modified
    if ((!(replicationServers.size() == newReplServers.size()
        && replicationServers.containsAll(newReplServers))) ||
        window != configuration.getWindowSize() ||
        heartbeatInterval != configuration.getHeartbeatInterval())
      needToRestartSession = true;
    replicationServers = newReplServers;
    window = configuration.getWindowSize();
    heartbeatInterval = configuration.getHeartbeatInterval();
    broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay,
                        maxSendQueue, maxSendDelay, window, heartbeatInterval);
        maxSendQueue, maxSendDelay, window, heartbeatInterval);
    isolationpolicy = configuration.getIsolationPolicy();
    // To be able to stop and restart the broker properly just
    // disable and enable the domain. That way a new session
    // with the new configuration is available.
    if (needToRestartSession)
    {
      this.disable();
      this.enable();
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -627,9 +627,13 @@
  {
    // Changing those properties don't need specific code.
    // They will be applied for next connections.
    disconnectRemovedReplicationServers(configuration.getReplicationServer());
    replicationServers = configuration.getReplicationServer();
    if (replicationServers == null)
      replicationServers = new ArrayList<String>();
    queueSize = configuration.getQueueSize();
    long newPurgeDelay = configuration.getReplicationPurgeDelay();
    if (newPurgeDelay != purgeDelay)
@@ -1024,4 +1028,30 @@
      }
    }
  }
  /**
   * Compute the list of replication servers that are not any
   * more connected to this Replication Server and stop the
   * corresponding handlers.
   * @param newReplServers the list of the new replication servers configured.
   */
  private void disconnectRemovedReplicationServers(
      Collection<String> newReplServers)
  {
    Collection<String> serversToDisconnect = new ArrayList<String>();
    for (String server: replicationServers)
    {
      if (!newReplServers.contains(server))
        serversToDisconnect.add(server);
    }
    if (serversToDisconnect.isEmpty())
      return;
    for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
    {
      replicationServerDomain.stopServers(serversToDisconnect);
    }
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -330,6 +331,21 @@
  }
  /**
   * Stop operations with a list of servers.
   *
   * @param replServers the replication servers for which
   * we want to stop operations
   */
  public void stopServers(Collection<String> replServers)
  {
    for (ServerHandler handler : replicationServers.values())
    {
      if (replServers.contains(handler.getServerAddressURL()))
        stopServer(handler);
    }
  }
  /**
   * Stop operations with a given server.
   *
   * @param handler the server for which we want to stop operations
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -45,9 +45,7 @@
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.ResultCode;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -132,9 +130,10 @@
  /**
   * Test the failover feature when one RS fails:
   * 1 DS (DS1) and 2 RS (RS1 and RS2) in topology.
   * DS1 connected to RS1 (DS1<->RS1)
   * DS1 connected to one RS
   * Both RS are connected together (RS1<->RS2)
   * RS1 fails, DS1 should be connected to RS2
   * The RS connected to DS1 fails, DS1 should be connected
   * to the other RS
   *
   * @throws Exception If a problem occured
   */
@@ -142,7 +141,7 @@
  public void testFailOverSingle() throws Exception
  {
    String testCase = "testFailOverSingle";
    int rsPort = -1;
    debugInfo("Starting " + testCase);
    initTest();
@@ -158,16 +157,29 @@
    // DS1 connected to RS1 ?
    String msg = "Before " + RS1_ID + " failure";
    checkConnection(DS1_ID, RS1_ID, msg);
    // Check which replication server is connected to this LDAP server
    rsPort = findReplServerConnected(rd1);
    // Simulate RS1 failure
    rs1.shutdown();
    // Let time for failover to happen
    sleep(5000);
    if (rsPort == rs1Port)
    {
      // Simulate RS1 failure
      rs1.shutdown();
      // Let time for failover to happen
      sleep(5000);
      // DS1 connected to RS2 ?
      msg = "After " + RS1_ID + " failure";
      checkConnection(DS1_ID, RS2_ID, msg);
    }
    else
    { // Simulate RS2 failure
      rs2.shutdown();
      // Let time for failover to happen
      sleep(5000);
      // DS1 connected to RS1 ?
      msg = "After " + RS2_ID + " failure";
      checkConnection(DS1_ID, RS1_ID, msg);
    }
    // DS1 connected to RS2 ?
    msg = "After " + RS1_ID + " failure";
    checkConnection(DS1_ID, RS2_ID, msg);
    endTest();
  }
@@ -209,10 +221,10 @@
    rd2 = createReplicationDomain(baseDn, DS2_ID, testCase);
    // DS1 connected to RS1 ?
    String msg = "Before " + RS1_ID + " failure";
    checkConnection(DS1_ID, RS1_ID, msg);
    //String msg = "Before " + RS1_ID + " failure";
    //checkConnection(DS1_ID, RS1_ID, msg);
    // DS2 connected to RS2 ?
    checkConnection(DS2_ID, RS2_ID, msg);
    //checkConnection(DS2_ID, RS1_ID, msg);
    // Simulate RS1 failure
    rs1.shutdown();
@@ -220,7 +232,7 @@
    sleep(5000);
    // DS1 connected to RS2 ?
    msg = "After " + RS1_ID + " failure";
    String msg = "After " + RS1_ID + " failure";
    checkConnection(DS1_ID, RS2_ID, msg);
    // DS2 connected to RS2 ?
    checkConnection(DS2_ID, RS2_ID, msg);
@@ -395,17 +407,10 @@
    SortedSet<String> replServers = new TreeSet<String>();
    try
    {
      if (serverId == DS1_ID)
      {
        replServers.add("localhost:" + rs1Port);
      } else if (serverId == DS2_ID)
      {
        replServers.add("localhost:" + rs2Port);
      } else
      {
        fail("Unknown replication domain server id.");
      }
      // Create a domain with two replication servers
      replServers.add("localhost:" + rs1Port);
      replServers.add("localhost:" + rs2Port);
      DomainFakeCfg domainConf =
        new DomainFakeCfg(baseDn, serverId, replServers);
      //domainConf.setHeartbeatInterval(500);
@@ -413,32 +418,6 @@
        MultimasterReplication.createNewDomain(domainConf);
      replicationDomain.start();
      // Add other server (doing that after connection insure we connect to
      // the right server)
      // WARNING: only works because for the moment, applying changes to conf
      // does not force reconnection in replication domain
      // when it is coded, the reconnect may 1 of both servers and we can not
      // guaranty anymore that we reach the server we want at the beginning.
      if (serverId == DS1_ID)
      {
        replServers.add("localhost:" + rs2Port);
      } else if (serverId == DS2_ID)
      {
        replServers.add("localhost:" + rs1Port);
      } else
      {
        fail("Unknown replication domain server id.");
      }
      domainConf = new DomainFakeCfg(baseDn, serverId, replServers);
      ConfigChangeResult chgRes =
        replicationDomain.applyConfigurationChange(domainConf);
      if ((chgRes == null) ||
        (!chgRes.getResultCode().equals(ResultCode.SUCCESS)))
      {
        fail("Could not change replication domain config" +
          " (add some replication servers).");
      }
      return replicationDomain;
    } catch (Exception e)
@@ -475,4 +454,21 @@
    super.classCleanUp();
  // In case we need it extend
  }
  private int findReplServerConnected(ReplicationDomain rd)
  {
    int rsPort = -1;
    // First check that the Replication domain is connected
    if (!rd.isConnected())
      return rsPort;
    String serverStr = rd.getReplicationServer();
    int index = serverStr.lastIndexOf(':');
    if ((index == -1) || (index >= serverStr.length()))
      fail("Enable to find port number in: " + serverStr);
    rsPort = (new Integer(serverStr.substring(index + 1)));
      return rsPort;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -39,6 +39,7 @@
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -132,6 +133,7 @@
    TestCaseUtils.dsconfig(
        "create-replication-server",
        "--provider-name", "Multimaster Synchronization",
        "--set", "replication-db-directory:" + "replicationServerTestDb",
        "--set", "replication-port:" + replicationServerPort,
        "--set", "replication-server-id:1");
    
@@ -187,6 +189,7 @@
    backupRestore();
    stopChangelog();
    windowProbeTest();
    replicationServerConnected();
  }
  /**
@@ -340,8 +343,8 @@
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.getChangeNumber().equals(firstChangeNumberServer1),
            "The first message received by a new client was the wrong one."
            + del.getChangeNumber() + " " + firstChangeNumberServer1);
            "The first message received by a new client was the wrong one : "
            + del.getChangeNumber() + " instead of " + firstChangeNumberServer1);
      }
    }
    finally
@@ -725,7 +728,7 @@
        String baseUUID = "22222222-2222-2222-2222-222222222222";
        // - Add
        String lentry = new String("dn: dc=example,dc=com\n"
        String lentry = new String("dn: o=test,dc=example,dc=com\n"
            + "objectClass: top\n" + "objectClass: domain\n"
            + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
        Entry entry = TestCaseUtils.entryFromLdifString(lentry);
@@ -1529,4 +1532,187 @@
       assertEquals(retVal, 53, "Returned error: " + eStream);
     } catch(Exception e) {}
   }
   /**
    * Replication Server configuration test of the replication Server code with 2 replication servers involved
    * 2 tests are done here (itest=0 or itest=1)
    *
    * Test 1
    * - Create replication server 1
    * - Create replication server 2
    * - Connect replication server 1 to replication server 2
    * - Create and connect client 1 to replication server 1
    * - Create and connect client 2 to replication server 2
    * - Make client1 publish changes
    * - Check that client 2 receives the changes published by client 1
    * Then
    * - Change the config of replication server 1 to no more be connected
    * to server 2
    * - Make client 1 publish a change
    * - Check that client 2 does not receive the change
    */
   @Test
   private void replicationServerConnected() throws Exception
   {
       ReplicationBroker broker1 = null;
       ReplicationBroker broker2 = null;
       boolean emptyOldChanges = true;
       // - Create 2 connected replicationServer
       ReplicationServer[] changelogs = new ReplicationServer[2];
       int[] changelogPorts = new int[2];
       int[] changelogIds = new int[2];
       short[] brokerIds = new short[2];
       ServerSocket socket = null;
       // Find 2 free ports
       for (int i = 0; i <= 1; i++)
       {
         // find  a free port
         socket = TestCaseUtils.bindFreePort();
         changelogPorts[i] = socket.getLocalPort();
         changelogIds[i] = i + 10;
         brokerIds[i] = (short) (100+i);
         socket.close();
       }
       for (int i = 0; i <= 1; i++)
       {
         changelogs[i] = null;
         // create the 2 replicationServer
         // and connect the first one to the other one
         SortedSet<String> servers = new TreeSet<String>();
         // Connect only replicationServer[0] to ReplicationServer[1]
         // and not the other way
         if (i==0)
           servers.add("localhost:" + changelogPorts[1]);
         ReplServerFakeConfiguration conf =
           new ReplServerFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0,
                                          changelogIds[i], 0, 100, servers);
         changelogs[i] = new ReplicationServer(conf);
       }
       try
       {
         // Create and connect client1 to changelog1
         // and client2 to changelog2
         broker1 = openReplicationSession(DN.decode("dc=example,dc=com"),
              brokerIds[0], 100, changelogPorts[0], 1000, emptyOldChanges);
         broker2 = openReplicationSession(DN.decode("dc=example,dc=com"),
              brokerIds[1], 100, changelogPorts[0], 1000, emptyOldChanges);
         // - Test messages between clients by publishing now
         long time = TimeThread.getTime();
         int ts = 1;
         ChangeNumber cn;
         String user1entryUUID = "33333333-3333-3333-3333-333333333333";
         String baseUUID  = "22222222-2222-2222-2222-222222222222";
         // - Add
         String lentry = new String("dn: o=test,dc=example,dc=com\n"
             + "objectClass: top\n" + "objectClass: domain\n"
             + "entryUUID: "+ user1entryUUID +"\n");
         Entry entry = TestCaseUtils.entryFromLdifString(lentry);
         cn = new ChangeNumber(time, ts++, brokerIds[0]);
         AddMsg addMsg = new AddMsg(cn, "o=test,dc=example,dc=com",
             user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
             .getAttributes(), new ArrayList<Attribute>());
         broker1.publish(addMsg);
         // - Modify
         Attribute attr1 = new Attribute("description", "new value");
         Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
         List<Modification> mods = new ArrayList<Modification>();
         mods.add(mod1);
         cn = new ChangeNumber(time, ts++, brokerIds[0]);
         ModifyMsg modMsg = new ModifyMsg(cn, DN
             .decode("o=test,dc=example,dc=com"), mods, "fakeuniqueid");
         broker1.publish(modMsg);
            // - Check msg received by broker, through changeLog2
         while (ts > 1)
         {
           ReplicationMessage msg2;
           try
           {
             msg2 = broker2.receive();
             if (msg2 == null)
               break;
             broker2.updateWindowAfterReplay();
           }
           catch (Exception e)
           {
             fail("Broker receive failed: " + e.getMessage() + "#Msg: " + ts);
             break;
           }
           if (msg2 instanceof AddMsg)
           {
             AddMsg addMsg2 = (AddMsg) msg2;
             if (addMsg2.toString().equals(addMsg.toString()))
               ts--;
           }
           else if (msg2 instanceof ModifyMsg)
           {
             ModifyMsg modMsg2 = (ModifyMsg) msg2;
             if (modMsg.equals(modMsg2))
               ts--;
           }
           else
           {
             fail("ReplicationServer transmission failed: no expected message" +
               " class: " + msg2);
             break;
           }
         }
         // Check that everything expected has been received
         assertTrue(ts == 1, "Broker2 did not receive the complete set of"
             + " expected messages: #msg received " + ts);
         // Then change the config to remove replicationServer[1] from
         // the configuration of replicationServer[0]
         SortedSet<String> servers = new TreeSet<String>();
         // Configure replicationServer[0] to be disconnected from ReplicationServer[1]
         ReplServerFakeConfiguration conf =
           new ReplServerFakeConfiguration(changelogPorts[0], "changelogDb0", 0,
                                          changelogIds[0], 0, 100, servers);
         changelogs[0].applyConfigurationChange(conf) ;
         // We expect the receive to end because of a timeout : the link between RS1 & RS2
         // should be distroyed by the new configuration
         // Send 1 update and check that RS[1] does not receive the message after the timeout
         try
         {
           // - Del
           cn = new ChangeNumber(time, ts++, brokerIds[0]);
           DeleteMsg delMsg = new DeleteMsg("o=test,dc=example,dc=com", cn, user1entryUUID);
           broker1.publish(delMsg);
           broker2.receive();
         }
         catch (SocketTimeoutException soExc)
         {
         // the receive fail as expected
           return;
         }
         fail("Broker: receive successed when it should fail. "
               + "This broker was disconnected by configuration");
       }
       finally
       {
         if (changelogs[0] != null)
           changelogs[0].remove();
         if (changelogs[1] != null)
           changelogs[1].remove();
         if (broker1 != null)
           broker1.stop();
         if (broker2 != null)
           broker2.stop();
       }
     }
}