mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.26.2014 de36fa06856d8d04652401bb24e49c3259aef154
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -38,9 +38,12 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -67,6 +70,7 @@
  private ReplicationServer replicationServer;
  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
  private static final String GENERATION_ID_TAG = "GENID";
  private static final String OFFLINE_TAG = "OFFLINE";
  private static final String FIELD_SEPARATOR = " ";
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
@@ -268,6 +272,19 @@
          }
          result.setDomainGenerationId(baseDN, generationId);
        }
        else if (prefix.equals(OFFLINE_TAG))
        {
          final String[] str = stringKey.split(FIELD_SEPARATOR, 3);
          final int serverId = toInt(str[1]);
          final DN baseDN = DN.decode(str[2]);
          long timestamp = ByteString.wrap(entry.getValue()).asReader().getLong();
          if (debugEnabled())
          {
            debug("has read replica offline: baseDN=" + baseDN + " serverId="
                + serverId);
          }
          result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId));
        }
        else
        {
          final String[] str = stringData.split(FIELD_SEPARATOR, 2);
@@ -275,7 +292,8 @@
          final DN baseDN = DN.decode(str[1]);
          if (debugEnabled())
          {
            debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId);
            debug("has read replica: baseDN=" + baseDN + " serverId="
                + serverId);
          }
          result.addServerIdToDomain(serverId, baseDN);
        }
@@ -416,7 +434,7 @@
      // Opens the DB for the changes received from this server on this domain.
      final Database replicaDB = openDatabase(replicaEntry.getKey());
      putInChangelogStateDBIfNotExist(replicaEntry);
      putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
      putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
      return replicaDB;
    }
@@ -436,7 +454,7 @@
   *          the replica's serverId
   * @return a database entry for the replica
   */
  Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
  static Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
  {
    final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
    return new SimpleImmutableEntry<String, String>(key, key);
@@ -452,30 +470,65 @@
   *          the domain's generationId
   * @return a database entry for the generationId
   */
  Entry<String, String> toGenIdEntry(DN baseDN, long generationId)
  static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId)
  {
    final String normDn = baseDN.toNormalizedString();
    final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn;
    final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
        + FIELD_SEPARATOR + normDn;
    return new SimpleImmutableEntry<String, String>(key, data);
    return new SimpleImmutableEntry<byte[], byte[]>(toBytes(key),toBytes(data));
  }
  private void putInChangelogStateDBIfNotExist(Entry<String, String> entry)
      throws RuntimeException
  /**
   * Converts an Entry&lt;String, String&gt; to an Entry&lt;byte[], byte[]&gt;.
   *
   * @param entry
   *          the entry to convert
   * @return the converted entry
   */
  static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry)
  {
    DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
    return new SimpleImmutableEntry<byte[], byte[]>(
        toBytes(entry.getKey()),
        toBytes(entry.getValue()));
  }
  /**
   * Return an entry to store in the changelog state database representing the
   * time a replica went offline.
   *
   * @param baseDN
   *          the replica's baseDN
   * @param offlineCSN
   *          the replica's serverId and offline timestamp
   * @return a database entry representing the time a replica went offline
   */
  static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
  {
    final byte[] key =
        toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
            + FIELD_SEPARATOR + baseDN.toNormalizedString());
    final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
    data.append(offlineCSN.getTime());
    return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
  }
  private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
      throws ChangelogException, RuntimeException
  {
    DatabaseEntry key = new DatabaseEntry(entry.getKey());
    DatabaseEntry data = new DatabaseEntry();
    if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
    {
      Transaction txn = dbEnvironment.beginTransaction(null, null);
      try
      {
        data.setData(toBytes(entry.getValue()));
        data.setData(entry.getValue());
        if (debugEnabled())
        {
          debug("putting record in the changelogstate Db key=["
              + entry.getKey() + "] value=[" + entry.getValue() + "]");
              + toString(entry.getKey()) + "] value=["
              + toString(entry.getValue()) + "]");
        }
        changelogStateDb.put(txn, key, data);
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
@@ -585,11 +638,11 @@
   */
  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId),
    deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
  }
  private void deleteFromChangelogStateDB(Entry<String, ?> entry,
  private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
      String methodInvocation) throws ChangelogException
  {
    if (debugEnabled())
@@ -599,7 +652,7 @@
    try
    {
      final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
      final DatabaseEntry key = new DatabaseEntry(entry.getKey());
      final DatabaseEntry data = new DatabaseEntry();
      if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
      {
@@ -620,18 +673,65 @@
          throw dbe;
        }
      }
      else
      else if (debugEnabled())
      {
        if (debugEnabled())
        {
          debug(methodInvocation + " failed: key=[ " + entry.getKey()
              + "] not found");
        }
        debug(methodInvocation + " failed: key not found");
      }
    }
    catch (RuntimeException dbe)
    catch (RuntimeException e)
    {
      throw new ChangelogException(dbe);
      if (debugEnabled())
      {
        debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
      }
      throw new ChangelogException(e);
    }
  }
  /**
   * Add the information about an offline replica to the changelog state DB.
   *
   * @param baseDN
   *          the domain of the offline replica
   * @param offlineCSN
   *          the offline replica serverId and offline timestamp
   * @throws ChangelogException
   *           if a database problem occurred
   */
  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  {
    // just overwrite any older entry as it is assumed a newly received offline
    // CSN is newer than the previous one
    putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
        "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
  }
  private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
      String methodInvocation) throws ChangelogException
  {
    if (debugEnabled())
    {
      debug(methodInvocation + " starting");
    }
    try
    {
      final DatabaseEntry key = new DatabaseEntry(entry.getKey());
      final DatabaseEntry data = new DatabaseEntry(entry.getValue());
      changelogStateDb.put(null, key, data);
      if (debugEnabled())
      {
        debug(methodInvocation + " succeeded");
      }
    }
    catch (RuntimeException e)
    {
      if (debugEnabled())
      {
        debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
      }
      throw new ChangelogException(e);
    }
  }