| | |
| | | |
| | | import com.sleepycat.je.*; |
| | | |
| | | import static com.sleepycat.je.LockMode.*; |
| | | import static com.sleepycat.je.OperationStatus.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | |
| | | public class ReplicationDbEnv |
| | | { |
| | | private Environment dbEnvironment; |
| | | private Database stateDb; |
| | | private Database changelogStateDb; |
| | | private ReplicationServer replicationServer; |
| | | private static final String GENERATION_ID_TAG = "GENID"; |
| | | private static final String FIELD_SEPARATOR = " "; |
| | |
| | | * read buffers. This will result in more scalable checkpointer and |
| | | * cleaner performance. |
| | | */ |
| | | envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", |
| | | String.valueOf(2 * 1024 * 1024)); |
| | | envConfig.setConfigParam("je.log.iteratorReadSize", |
| | | String.valueOf(2 * 1024 * 1024)); |
| | | envConfig.setConfigParam("je.log.faultReadSize", |
| | | String.valueOf(4 * 1024)); |
| | | envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", mb(2)); |
| | | envConfig.setConfigParam("je.log.iteratorReadSize", mb(2)); |
| | | envConfig.setConfigParam("je.log.faultReadSize", kb(4)); |
| | | |
| | | /* |
| | | * The cache size must be bigger in order to accommodate the larger |
| | | * buffers - see OPENDJ-943. |
| | | */ |
| | | envConfig.setConfigParam("je.maxMemory", |
| | | String.valueOf(16 * 1024 * 1024)); |
| | | envConfig.setConfigParam("je.maxMemory", mb(16)); |
| | | } |
| | | else |
| | | { |
| | |
| | | * Use 5M so that the replication can be used with 64M total for the |
| | | * JVM. |
| | | */ |
| | | envConfig.setConfigParam("je.maxMemory", |
| | | String.valueOf(5 * 1024 * 1024)); |
| | | envConfig.setConfigParam("je.maxMemory", mb(5)); |
| | | } |
| | | |
| | | // Since records are always added at the end of the Replication log and |
| | |
| | | * the topology. The database "changelogstate" is used to store the list |
| | | * of all the servers that have been seen in the past. |
| | | */ |
| | | stateDb = openDatabase("changelogstate"); |
| | | changelogStateDb = openDatabase("changelogstate"); |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private String kb(int sizeInKb) |
| | | { |
| | | return String.valueOf(sizeInKb * 1024); |
| | | } |
| | | |
| | | private String mb(int sizeInMb) |
| | | { |
| | | return String.valueOf(sizeInMb * 1024 * 1024); |
| | | } |
| | | |
| | | private Database openDatabase(String databaseName) throws RuntimeException |
| | | { |
| | | final DatabaseConfig dbConfig = new DatabaseConfig(); |
| | |
| | | * |
| | | * @throws ChangelogException in case of underlying Exception |
| | | */ |
| | | public void start() throws ChangelogException |
| | | public void initializeFromChangelogStateDB() throws ChangelogException |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | Cursor cursor = stateDb.openCursor(null, null); |
| | | Cursor cursor = changelogStateDb.openCursor(null, null); |
| | | |
| | | try |
| | | { |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | String stringData = toString(data.getData()); |
| | | final String stringData = toString(data.getData()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() |
| | | + " Read (tag/baseDn/generationId) OR (serverId/baseDN): " |
| | | + stringData); |
| | | debug("read (" + GENERATION_ID_TAG + " generationId baseDn) OR " |
| | | + "(serverId baseDN): " + stringData); |
| | | |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | final String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | if (str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | long generationId = toLong(str[1]); |
| | | String baseDn = str[2]; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() |
| | | + " Has read baseDn=" + baseDn + " generationId=" |
| | | + generationId); |
| | | debug("has read baseDn=" + baseDn + " generationId=" +generationId); |
| | | |
| | | replicationServer.initDomainGenerationID(baseDn, generationId); |
| | | } |
| | |
| | | String baseDn = str[1]; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() |
| | | + " Has read: baseDn=" + baseDn + " serverId=" + serverId); |
| | | debug("has read: baseDn=" + baseDn + " serverId=" + serverId); |
| | | |
| | | replicationServer.addServerIdToDomain(serverId, baseDn); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Finds or creates the database used to store changes from the server |
| | | * with the given serverId and the given baseDn. |
| | | * |
| | | * @param serverId The server id that identifies the server. |
| | | * @param baseDn The baseDn that identifies the domain. |
| | | * @param generationId The generationId associated to this domain. |
| | | * @return the Database. |
| | | * @throws ChangelogException in case of underlying Exception. |
| | | */ |
| | | public Database getOrAddDb(int serverId, String baseDn, long generationId) |
| | | throws ChangelogException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " + |
| | | serverId + " " + baseDn + " " + generationId); |
| | | try |
| | | { |
| | | final String serverIdKey = serverId + FIELD_SEPARATOR + baseDn; |
| | | |
| | | // Opens the database for the changes received from this server |
| | | // on this domain. Create it if it does not already exist. |
| | | Database db = openDatabase(serverIdKey); |
| | | |
| | | // Creates the record serverId/domain base Dn in the stateDb |
| | | // if it does not already exist. |
| | | putInStateDBIfNotExist(serverIdKey, serverIdKey); |
| | | |
| | | // Creates the record domain base Dn/ generationId in the stateDb |
| | | // if it does not already exist. |
| | | final String genIdKey = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn; |
| | | final String genIdData = GENERATION_ID_TAG |
| | | + FIELD_SEPARATOR + generationId |
| | | + FIELD_SEPARATOR + baseDn; |
| | | putInStateDBIfNotExist(genIdKey, genIdData); |
| | | return db; |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | throw new ChangelogException(e); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | private void putInStateDBIfNotExist(String keyString, String dataString) |
| | | throws UnsupportedEncodingException, RuntimeException |
| | | private byte[] toBytes(String s) |
| | | { |
| | | byte[] byteId = keyString.getBytes("UTF-8"); |
| | | byte[] dataByteId = dataString.getBytes("UTF-8"); |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | try |
| | | { |
| | | return s.getBytes("UTF-8"); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Finds or creates the database used to store changes from the server with |
| | | * the given serverId and the given baseDn. |
| | | * |
| | | * @param serverId |
| | | * The server id that identifies the server. |
| | | * @param baseDn |
| | | * The baseDn that identifies the domain. |
| | | * @param generationId |
| | | * The generationId associated to this domain. |
| | | * @return the Database. |
| | | * @throws ChangelogException |
| | | * in case of underlying Exception. |
| | | */ |
| | | public Database getOrAddDb(int serverId, String baseDn, long generationId) |
| | | throws ChangelogException |
| | | { |
| | | if (debugEnabled()) |
| | | debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDn + ", " |
| | | + generationId + ")"); |
| | | try |
| | | { |
| | | // JNR: redundant info is stored between the key and data down below. |
| | | // It is probably ok since "changelogstate" DB does not receive a high |
| | | // volume of inserts. |
| | | final String serverIdToBaseDn = buildServerIdKey(baseDn, serverId); |
| | | |
| | | // Opens the DB for the changes received from this server on this domain. |
| | | Database db = openDatabase(serverIdToBaseDn); |
| | | |
| | | putInChangelogStateDBIfNotExist(serverIdToBaseDn, serverIdToBaseDn); |
| | | putInChangelogStateDBIfNotExist(buildGenIdKey(baseDn), |
| | | buildGenIdData(baseDn, generationId)); |
| | | return db; |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | throw new ChangelogException(e); |
| | | } |
| | | } |
| | | |
| | | private String buildGenIdKey(String baseDn) |
| | | { |
| | | return GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn; |
| | | } |
| | | |
| | | private String buildServerIdKey(String baseDn, int serverId) |
| | | { |
| | | return serverId + FIELD_SEPARATOR + baseDn; |
| | | } |
| | | |
| | | private String buildGenIdData(String baseDn, long generationId) |
| | | { |
| | | return GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR |
| | | + baseDn; |
| | | } |
| | | |
| | | private void putInChangelogStateDBIfNotExist(String keyString, |
| | | String dataString) throws RuntimeException |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(toBytes(keyString)); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status == OperationStatus.NOTFOUND) |
| | | if (changelogStateDb.get(null, key, data, DEFAULT) == NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try |
| | | { |
| | | data.setData(dataByteId); |
| | | data.setData(toBytes(dataString)); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Created in the state Db record key=[" + keyString |
| | | debug("putting record in the changelogstate Db key=[" + keyString |
| | | + "] value=[" + dataString + "]"); |
| | | stateDb.put(txn, key, data); |
| | | changelogStateDb.put(txn, key, data); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | | } |
| | | catch (DatabaseException dbe) |
| | |
| | | { |
| | | try |
| | | { |
| | | stateDb.close(); |
| | | changelogStateDb.close(); |
| | | } catch (DatabaseException e) |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | |
| | | */ |
| | | public void clearGenerationId(String baseDn) |
| | | { |
| | | String methodInvocation = "clearGenerationId(baseDN=" + baseDn + ")"; |
| | | |
| | | String key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn; |
| | | OperationStatus status = deleteFromStateDB(key, methodInvocation); |
| | | if (status == OperationStatus.SUCCESS || status == OperationStatus.KEYEXIST) |
| | | { |
| | | // TODO : should have a better error logging |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " |
| | | + this.replicationServer.getMonitorInstanceName() |
| | | + methodInvocation + " failed " + status); |
| | | } |
| | | deleteFromChangelogStateDB(buildGenIdKey(baseDn), |
| | | "clearGenerationId(baseDN=" + baseDn + ")"); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void clearServerId(String baseDn, int serverId) |
| | | { |
| | | String key = serverId + FIELD_SEPARATOR + baseDn; |
| | | deleteFromStateDB(key, "clearServerId(baseDN=" + baseDn + " , serverId=" |
| | | + serverId + ")"); |
| | | deleteFromChangelogStateDB(buildServerIdKey(baseDn, serverId), |
| | | "clearServerId(baseDN=" + baseDn + " , serverId=" + serverId + ")"); |
| | | } |
| | | |
| | | private OperationStatus deleteFromStateDB(String keyString, |
| | | private void deleteFromChangelogStateDB(String keyString, |
| | | String methodInvocation) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + " " + methodInvocation); |
| | | debug(methodInvocation + " starting"); |
| | | |
| | | try |
| | | { |
| | | final byte[] byteId = keyString.getBytes("UTF-8"); |
| | | final DatabaseEntry key = new DatabaseEntry(); |
| | | key.setData(byteId); |
| | | final DatabaseEntry key = new DatabaseEntry(toBytes(keyString)); |
| | | final DatabaseEntry data = new DatabaseEntry(); |
| | | OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT); |
| | | if (status != OperationStatus.NOTFOUND) |
| | | OperationStatus status = changelogStateDb.get(null, key, data, DEFAULT); |
| | | if (status == SUCCESS) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try |
| | | { |
| | | stateDb.delete(txn, key); |
| | | changelogStateDb.delete(txn, key); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo(" In " |
| | | + this.replicationServer.getMonitorInstanceName() + " " |
| | | + methodInvocation + " succeeded " + status); |
| | | debug(methodInvocation + " succeeded"); |
| | | } |
| | | catch (RuntimeException dbe) |
| | | { |
| | |
| | | throw dbe; |
| | | } |
| | | } |
| | | return status; |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | | // can't happen |
| | | else |
| | | { |
| | | if (debugEnabled()) |
| | | debug(methodInvocation + " failed: key=[ " + keyString |
| | | + "] not found"); |
| | | } |
| | | } |
| | | catch (RuntimeException dbe) |
| | | { |
| | | // FIXME can actually happen (see catch above) |
| | | // what should we do about it? |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | |
| | | replicationServer.shutdown(); |
| | | } |
| | | |
| | | private void debug(String message) |
| | | { |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + ", " |
| | | + message); |
| | | } |
| | | |
| | | } |