| | |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.server.ChangelogDB.ChangelogCursor; |
| | | import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | | /** |
| | | * This class is used for managing the changelog database for each servers |
| | | * in the topology. |
| | | * This class is used for managing the replicationServer database for each |
| | | * server in the topology. |
| | | * It is responsible for efficiently saving the updates that is received from |
| | | * each master server into stable storage. |
| | | * This class is also able to generate a ChangelogIterator that can be |
| | | * This class is also able to generate a ReplicationIterator that can be |
| | | * used to read all changes from a given ChangeNUmber. |
| | | * |
| | | * This class publish some monitoring information below cn=monitor. |
| | |
| | | // This queue hold all the updates not yet saved to stable storage |
| | | // it is only used as a temporary placeholder so that the write |
| | | // in the stable storage can be grouped for efficiency reason. |
| | | // it is never read back by changelog threads that are responsible |
| | | // for pushing the changes to other changelog server or to LDAP server |
| | | // it is never read back by replicationServer threads that are responsible |
| | | // for pushing the changes to other replication server or to LDAP server |
| | | private LinkedList<UpdateMessage> msgQueue = new LinkedList<UpdateMessage>(); |
| | | private ChangelogDB db; |
| | | private ReplicationDB db; |
| | | private ChangeNumber firstChange = null; |
| | | private ChangeNumber lastChange = null; |
| | | private short serverId; |
| | |
| | | * |
| | | * @param id Identifier of the DB. |
| | | * @param baseDn the baseDn for which this DB was created. |
| | | * @param changelog The Changelog that creates this dbHandler. |
| | | * @param dbenv the Database Env to use to create the Changelog DB. |
| | | * @param replicationServer The ReplicationServer that creates this dbHandler. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * @throws DatabaseException If a database problem happened |
| | | */ |
| | | public DbHandler(short id, DN baseDn, Changelog changelog, |
| | | ChangelogDbEnv dbenv) |
| | | public DbHandler(short id, DN baseDn, ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv) |
| | | throws DatabaseException |
| | | { |
| | | this.serverId = id; |
| | | this.baseDn = baseDn; |
| | | this.trimage = changelog.getTrimage(); |
| | | db = new ChangelogDB(id, baseDn, changelog, dbenv); |
| | | this.trimage = replicationServer.getTrimage(); |
| | | db = new ReplicationDB(id, baseDn, replicationServer, dbenv); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | thread = new DirectoryThread(this, "changelog db " + id + " " + baseDn); |
| | | thread = new DirectoryThread(this, |
| | | "Replication Server db " + id + " " + baseDn); |
| | | thread.start(); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider( |
| | |
| | | } |
| | | |
| | | /** |
| | | * Generate a new ChangelogIterator that allows to browse the db |
| | | * Generate a new ReplicationIterator that allows to browse the db |
| | | * managed by this dbHandler and starting at the position defined |
| | | * by a given changeNumber. |
| | | * |
| | | * @param changeNumber The position where the iterator must start. |
| | | * |
| | | * @return a new ChangelogIterator that allows to browse the db |
| | | * @return a new ReplicationIterator that allows to browse the db |
| | | * managed by this dbHandler and starting at the position defined |
| | | * by a given changeNumber. |
| | | * |
| | |
| | | * @throws Exception If there is no other change to push after change |
| | | * with changeNumber number. |
| | | */ |
| | | public ChangelogIterator generateIterator(ChangeNumber changeNumber) |
| | | public ReplicationIterator generateIterator(ChangeNumber changeNumber) |
| | | throws DatabaseException, Exception |
| | | { |
| | | /* |
| | |
| | | flush(); |
| | | } |
| | | |
| | | return new ChangelogIterator(serverId, db, changeNumber); |
| | | return new ReplicationIterator(serverId, db, changeNumber); |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | /** |
| | | * Run method for this class. |
| | | * Periodically Flushes the ChangelogCache from memory to the stable storage |
| | | * Periodically Flushes the ReplicationCache from memory to the stable storage |
| | | * and trims the old updates. |
| | | */ |
| | | public void run() |
| | |
| | | } |
| | | |
| | | /** |
| | | * Flush old change information from this changelog database. |
| | | * Flush old change information from this replicationServer database. |
| | | * @throws DatabaseException In case of database problem. |
| | | */ |
| | | private void trim() throws DatabaseException, Exception |
| | |
| | | /* the trim is done by group in order to save some CPU and IO bandwidth |
| | | * start the transaction then do a bunch of remove then commit |
| | | */ |
| | | ChangelogCursor cursor; |
| | | ReplServerDBCursor cursor; |
| | | |
| | | cursor = db.openDeleteCursor(); |
| | | |
| | |
| | | { |
| | | private DbMonitorProvider() |
| | | { |
| | | super("Changelog Database"); |
| | | super("ReplicationServer Database"); |
| | | } |
| | | |
| | | /** |
| | |
| | | public ArrayList<Attribute> getMonitorData() |
| | | { |
| | | ArrayList<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(new Attribute("changelog-database", |
| | | attributes.add(new Attribute("replicationServer-database", |
| | | String.valueOf(serverId))); |
| | | attributes.add(new Attribute("base-dn", baseDn.toString())); |
| | | if (firstChange != null) |
| | |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | return "Changelog database " + baseDn.toString() + |
| | | return "ReplicationServer database " + baseDn.toString() + |
| | | " " + String.valueOf(serverId); |
| | | } |
| | | |