HomeSoftware DevelopmentTwo Section Commit

Two Section Commit


Answer

The essence of two section commit, unsurprisingly, is that it carries out an
replace in two phases:

  • the primary, put together, asks every node if it is capable of promise to hold out
    the replace
  • the second, commit, really carries it out.

As a part of the put together section, every node collaborating within the transaction
acquires no matter it must guarantee that will probably be capable of do the
commit within the second section, as an example any locks which can be required.
As soon as every node is ready to guarantee it may possibly commit within the second section, it lets
the coordinator know, successfully promising the coordinator that it may possibly and can
commit within the second section. If any node is unable to make that promise, then
the coordinator tells all nodes to rollback, releasing any locks they’ve,
and the transaction is aborted. Provided that all of the individuals conform to go
forward does the second section start, at which level it is anticipated they are going to
all efficiently replace.

Contemplating a easy distributed key worth retailer implementation,
the 2 section commit protocol works as follows.

The transactional shopper creates a novel identifier referred to as a transaction identifier.
The shopper additionally retains monitor of different particulars just like the transaction begin time.
That is used, as described later by the locking mechanism, to stop deadlocks.
The distinctive id, together with the extra particulars like the beginning timestamp,
that the shopper tracks is used to refer the transaction throughout the cluster nodes.
The shopper maintains a transaction reference as follows, which is handed alongside
with each request from the shopper to different cluster nodes.

class TransactionRef…

  non-public UUID txnId;
  non-public lengthy startTimestamp;


  public TransactionRef(lengthy startTimestamp) {
      this.txnId = UUID.randomUUID();
      this.startTimestamp = startTimestamp;
  }

class TransactionClient…

  TransactionRef transactionRef;

  public TransactionClient(ReplicaMapper replicaMapper, SystemClock systemClock) {
      this.clock = systemClock;
      this.transactionRef = new TransactionRef(clock.now());
      this.replicaMapper = replicaMapper;
  }

One of many cluster nodes acts as a coordinator which tracks the standing of the
transaction on behalf of the shopper.
In a key-value retailer, it’s usually the cluster node holding knowledge for
one of many keys. It’s usually picked up because the cluster node storing knowledge
for the primary key utilized by the shopper.

Earlier than storing any worth, the shopper communicates with the coordinator to inform
it in regards to the begin of the transaction.
As a result of the coordinator is without doubt one of the cluster nodes storing values,
it’s picked up dynamically when the shopper initiates a get or put operation
with a selected key.

class TransactionClient…

  non-public TransactionalKVStore coordinator;
  non-public void maybeBeginTransaction(String key) {
      if (coordinator == null) {
          coordinator = replicaMapper.serverFor(key);
          coordinator.start(transactionRef);
      }
  }

The transaction coordinator retains monitor of the standing of the transaction.
It information each change in a Write-Forward Log to verify
that the small print can be found in case of a crash.

class TransactionCoordinator…

  Map<TransactionRef, TransactionMetadata> transactions = new ConcurrentHashMap<>();
  WriteAheadLog transactionLog;

  public void start(TransactionRef transactionRef) {
      TransactionMetadata txnMetadata = new TransactionMetadata(transactionRef, systemClock, transactionTimeoutMs);
      transactionLog.writeEntry(txnMetadata.serialize());
      transactions.put(transactionRef, txnMetadata);
  }

class TransactionMetadata…

  non-public TransactionRef txn;
  non-public Checklist<String> participatingKeys = new ArrayList<>();
  non-public TransactionStatus transactionStatus;

The shopper sends every key which is a part of the transaction to the coordinator.
This fashion the coordinator tracks all of the keys that are a part of the transaction.
The coordinator information the keys that are a part of the transaction in
the transaction metadata.The keys then can be utilized to learn about all the
cluster nodes that are a part of the transaction.
As a result of every key-value is usually replicated with the
Replicated Log,
the chief server dealing with the requests for a specific key would possibly change
over the lifetime of the transaction, so the keys are tracked as a substitute of
the precise server addresses.
The shopper then sends the put or get requests to the server holding the info
for the important thing. The server is picked primarily based on the partitioning technique.
The factor to notice is that the shopper instantly communicates with the server
and never by way of the coordinator. This avoids sending knowledge twice over the community,
from shopper to coordinator, after which from coordinator to the respective server.

The keys then can be utilized to learn about all of the cluster nodes that are
a part of the transaction. As a result of every key-value is usually replicated with
Replicated Log, the chief server dealing with the requests
for a specific key would possibly change over the life time of the transaction, so
keys are tracked, quite than the precise server addresses.

class TransactionClient…

  public CompletableFuture<String> get(String key) {
      maybeBeginTransaction(key);
      coordinator.addKeyToTransaction(transactionRef, key);
      TransactionalKVStore kvStore = replicaMapper.serverFor(key);
      return kvStore.get(transactionRef, key);
  }

  public void put(String key, String worth) {
      maybeBeginTransaction(key);
      coordinator.addKeyToTransaction(transactionRef, key);
      replicaMapper.serverFor(key).put(transactionRef, key, worth);
  }

class TransactionCoordinator…

  public synchronized void addKeyToTransaction(TransactionRef transactionRef, String key) {
      TransactionMetadata metadata = transactions.get(transactionRef);
      if (!metadata.getParticipatingKeys().comprises(key)) {
          metadata.addKey(key);
          transactionLog.writeEntry(metadata.serialize());
      }
  }

The cluster node dealing with the request detects that the request is a part of a
transaction with the transaction ID. It manages the state of the transaction,
the place it shops the important thing and the worth within the request. The important thing values will not be
instantly made accessible to the important thing worth retailer, however saved individually.

class TransactionalKVStore…

  public void put(TransactionRef transactionRef, String key, String worth) {
      TransactionState state = getOrCreateTransactionState(transactionRef);
      state.addPendingUpdates(key, worth);
  }

Locks and Transaction Isolation

The requests additionally take a lock on the keys.
Notably, the get requests take a learn lock and
the put requests take a write lock. The learn locks are taken because the
values are learn.

class TransactionalKVStore…

  public CompletableFuture<String> get(TransactionRef txn, String key) {
      CompletableFuture<TransactionRef> lockFuture
              = lockManager.purchase(txn, key, LockMode.READ);
      return lockFuture.thenApply(transactionRef -> {
          getOrCreateTransactionState(transactionRef);
          return kv.get(key);
      });
  }

  synchronized TransactionState getOrCreateTransactionState(TransactionRef txnRef) {
      TransactionState state = this.ongoingTransactions.get(txnRef);
      if (state == null) {
          state = new TransactionState();
          this.ongoingTransactions.put(txnRef, state);
      }
      return state;
  }

The write locks might be taken solely when the transaction is about to commit
and the values are to be made seen in the important thing worth retailer. Till then, the
cluster node can simply monitor the modified values as pending operations.

Delaying locking decreases the probabilities of conflicting transactions.

class TransactionalKVStore…

  public void put(TransactionRef transactionRef, String key, String worth) {
      TransactionState state = getOrCreateTransactionState(transactionRef);
      state.addPendingUpdates(key, worth);
  }

You will need to be aware that the locks are lengthy lived and never launched
when the request completes. They’re launched solely when the transaction commits.
This system of holding locks during the transaction
and releasing them solely when the transaction commits or rolls again is
referred to as two-phase-locking.
Two-phase locking is essential in offering the serializable isolation stage.
Serializable that means that the results of the transactions are seen as
if they’re executed one after the other.

Impasse Prevention

Utilization of locks may cause deadlocks the place two transactions await
one another to launch the locks. Deadlocks might be averted if transactions
will not be allowed to attend and aborted when the conflicts are detected.
There are totally different methods used to resolve which transactions are
aborted and that are allowed to proceed.

The lock supervisor implements these wait insurance policies as follows:

class LockManager…

  WaitPolicy waitPolicy;

The WaitPolicy decides what to do when there are conflicting requests.

public enum WaitPolicy {
    WoundWait,
    WaitDie,
    Error
}

The lock is an object which tracks the transactions which presently
personal the lock and those that are ready for the lock.

class Lock…

  Queue<LockRequest> waitQueue = new LinkedList<>();
  Checklist<TransactionRef> homeowners = new ArrayList<>();
  LockMode lockMode;

When a transaction requests to accumulate a lock, the lock supervisor grants
the lock instantly if there aren’t any conflicting transactions already
proudly owning the lock.

class LockManager…

  public synchronized CompletableFuture<TransactionRef> purchase(TransactionRef txn, String key, LockMode lockMode) {
      return purchase(txn, key, lockMode, new CompletableFuture<>());
  }

  CompletableFuture<TransactionRef> purchase(TransactionRef txnRef,
                                            String key,
                                            LockMode askedLockMode,
                                            CompletableFuture<TransactionRef> lockFuture) {
      Lock lock = getOrCreateLock(key);

      logger.debug("buying lock for = " + txnRef + " on key = " + key + " with lock mode = " + askedLockMode);
      if (lock.isCompatible(txnRef, askedLockMode)) {
          lock.addOwner(txnRef, askedLockMode);
          lockFuture.full(txnRef);
          logger.debug("acquired lock for = " + txnRef);
          return lockFuture;
      }

class Lock…

  public boolean isCompatible(TransactionRef txnRef, LockMode lockMode) {
      if(hasOwner())  isUpgrade(txnRef, lockMode);
      
      return true;
  }

If there are conflicts,
the lock supervisor acts relying on the wait coverage.

Error On Battle

If the wait coverage is to error out, it is going to throw an error and the calling
transaction will rollback and retry after a random timeout.

class LockManager…

  non-public CompletableFuture<TransactionRef> handleConflict(Lock lock,
                                                           TransactionRef txnRef,
                                                           String key,
                                                           LockMode askedLockMode,
                                                           CompletableFuture<TransactionRef> lockFuture) {
      swap (waitPolicy) {
          case Error: {
              lockFuture.completeExceptionally(new WriteConflictException(txnRef, key, lock.homeowners));
              return lockFuture;
          }
          case WoundWait: {
              return lock.woundWait(txnRef, key, askedLockMode, lockFuture, this);
          }
          case WaitDie: {
              return lock.waitDie(txnRef, key, askedLockMode, lockFuture, this);
          }
      }
      throw new IllegalArgumentException("Unknown waitPolicy " + waitPolicy);
  }

In case of rivalry when there are plenty of person transactions
attempting to accumulate locks, if all of them have to restart, it severely
limits the programs throughput.
Information shops attempt to ensure that there are minimal transaction restarts.

A standard method is to assign a novel ID to transactions and order
them. For instance, Spanner assigns distinctive IDs to transactions
in such a approach that they are often ordered.
The method is similar to the one mentioned in
Paxos to order requests throughout cluster nodes.
As soon as the transactions might be ordered, there are two strategies used
to keep away from impasse, however nonetheless permit transactions to proceed with out
restarting

The transaction reference is created in such a approach that it may be
in contrast and ordered with different transaction references. The simplest
methodology is to assign a timestamp to every transaction and evaluate primarily based
on the timestamp.

class TransactionRef…

  boolean after(TransactionRef otherTransactionRef) {
      return this.startTimestamp > otherTransactionRef.startTimestamp;
  }

However in distributed programs,

wall clocks will not be monotonic
, so a distinct methodology like
assigning distinctive IDs to transactions in such a approach that
they are often ordered is used. Together with ordered IDs, the age of every
is tracked to have the ability to order the transactions.
Spanner orders transactions by monitoring the age of every
transaction within the system.

To have the ability to order all of the transactions, every cluster node is assigned
a novel ID. The shopper picks up the coordinator at first of
the transaction and will get the transaction ID from the coordinator
The cluster node appearing as a coordinator generates transaction
IDs as follows.

class TransactionCoordinator…

  non-public int requestId;
  public MonotonicId start() {
      return new MonotonicId(requestId++, config.getServerId());
  }

class MonotonicId…

  public class MonotonicId implements Comparable<MonotonicId> {
      public int requestId;
      int serverId;
  
      public MonotonicId(int requestId, int serverId) {
          this.serverId = serverId;
          this.requestId = requestId;
      }
  
      public static MonotonicId empty() {
          return new MonotonicId(-1, -1);
      }
  
      public boolean isAfter(MonotonicId different) {
          if (this.requestId == different.requestId) {
              return this.serverId > different.serverId;
          }
          return this.requestId > different.requestId;
      }

class TransactionClient…

  non-public void beginTransaction(String key) {
      if (coordinator == null) {
          coordinator = replicaMapper.serverFor(key);
          MonotonicId transactionId = coordinator.start();
          transactionRef = new TransactionRef(transactionId, clock.nanoTime());
      }
  }

The shopper tracks the age of the transaction by recording
the elapsed time because the starting of the transaction.

class TransactionRef…

  public void incrementAge(SystemClock clock) {
      age = clock.nanoTime() - startTimestamp;
  }

The shopper increments the age, each time a get or a put request
is shipped to the servers. The transactions are then ordered as
per their age. The transaction id is used to interrupt the ties when
there are similar age transactions.

class TransactionRef…

  public boolean isAfter(TransactionRef different) {
       return age == different.age?
                  this.id.isAfter(different.id)
                  :this.age > different.age;
  }
Wound-Wait

Within the wound-wait methodology, if there’s a battle,
the transaction reference asking for the lock is in comparison with all of the
transactions presently proudly owning the lock. If the lock homeowners are all
youthful than the transaction asking for the lock, all of these transactions are aborted.
But when the transaction asking the lock is youthful than those proudly owning
the transaction, it waits for the lock

class Lock…

  public CompletableFuture<TransactionRef> woundWait(TransactionRef txnRef,
                                                     String key,
                                                     LockMode askedLockMode,
                                                     CompletableFuture<TransactionRef> lockFuture,
                                                     LockManager lockManager) {

      if (allOwningTransactionsStartedAfter(txnRef) && !anyOwnerIsPrepared(lockManager)) {
          abortAllOwners(lockManager, key, txnRef);
          return lockManager.purchase(txnRef, key, askedLockMode, lockFuture);
      }

      LockRequest lockRequest = new LockRequest(txnRef, key, askedLockMode, lockFuture);
      lockManager.logger.debug("Including to attend queue = " + lockRequest);
      addToWaitQueue(lockRequest);
      return lockFuture;
  }

class Lock…

  non-public boolean allOwningTransactionsStartedAfter(TransactionRef txn) {
      return homeowners.stream().filter(o -> !o.equals(txn)).allMatch(proprietor -> proprietor.after(txn));
  }

One of many key issues to note is that if the transaction proudly owning
the lock is already within the ready state of two-phase-commit, it’s
not aborted.

Wait-Die

The wait-die methodology works within the reverse approach
to wound-wait.
If the lock homeowners are all youthful than the transaction
asking for the lock, then the transaction waits for the lock.
But when the transaction asking for the lock is youthful than those proudly owning
the transaction, the transaction is aborted.

class Lock…

  public CompletableFuture<TransactionRef> waitDie(TransactionRef txnRef,
                                                   String key,
                                                   LockMode askedLockMode,
                                                   CompletableFuture<TransactionRef> lockFuture,
                                                   LockManager lockManager) {
      if (allOwningTransactionsStartedAfter(txnRef)) {
          addToWaitQueue(new LockRequest(txnRef, key, askedLockMode, lockFuture));
          return lockFuture;
      }

      lockManager.abort(txnRef, key);
      lockFuture.completeExceptionally(new WriteConflictException(txnRef, key, homeowners));
      return lockFuture;
  }

Wound-wait mechanism usually has
fewer restarts
in comparison with the wait-die methodology.
So knowledge shops like Spanner use the wound-wait
methodology.

When the proprietor of the transaction releases a lock,
the ready transactions are granted the lock.

class LockManager…

  non-public void launch(TransactionRef txn, String key) {
      Elective<Lock> lock = getLock(key);
      lock.ifPresent(l -> {
          l.launch(txn, this);
      });
  }

class Lock…

  public void launch(TransactionRef txn, LockManager lockManager) {
      removeOwner(txn);
      if (hasWaiters()) {
          LockRequest lockRequest = getFirst(lockManager.waitPolicy);
          lockManager.purchase(lockRequest.txn, lockRequest.key, lockRequest.lockMode, lockRequest.future);
      }
  }

Commit and Rollback

As soon as the shopper efficiently reads with out dealing with any conflicts and
writes all the important thing values, it initiates the commit request by sending
a commit request to the coordinator.

class TransactionClient…

  public CompletableFuture<Boolean> commit() {
      return coordinator.commit(transactionRef);
  }

The transaction coordinator information the state of the transaction as
making ready to commit. The coordinator implements the commit dealing with in
two phases.

  • It first sends the put together request to every of the individuals.
  • As soon as it receives a profitable response from all of the individuals,
    the coordinator marks the transaction as ready to finish.
    Then it sends the commit request to all of the individuals.

class TransactionCoordinator…

  public CompletableFuture<Boolean> commit(TransactionRef transactionRef)  {
      TransactionMetadata metadata = transactions.get(transactionRef);
      metadata.markPreparingToCommit(transactionLog);
      Checklist<CompletableFuture<Boolean>> allPrepared = sendPrepareRequestToParticipants(transactionRef);
      CompletableFuture<Checklist<Boolean>> futureList = sequence(allPrepared);
      return futureList.thenApply(outcome -> {
          if (!outcome.stream().allMatch(r -> r)) {
              logger.data("Rolling again = " + transactionRef);
              rollback(transactionRef);
              return false;
          }
          metadata.markPrepared(transactionLog);
          sendCommitMessageToParticipants(transactionRef);
          metadata.markCommitComplete(transactionLog);
          return true;
      });
  }

  public Checklist<CompletableFuture<Boolean>> sendPrepareRequestToParticipants(TransactionRef transactionRef)  {
      TransactionMetadata transactionMetadata = transactions.get(transactionRef);
      var transactionParticipants = getParticipants(transactionMetadata.getParticipatingKeys());
      return transactionParticipants.keySet()
              .stream()
              .map(server -> server.handlePrepare(transactionRef))
              .gather(Collectors.toList());
  }

  non-public void sendCommitMessageToParticipants(TransactionRef transactionRef) {
      TransactionMetadata transactionMetadata = transactions.get(transactionRef);
      var participantsForKeys = getParticipants(transactionMetadata.getParticipatingKeys());
      participantsForKeys.keySet().stream()
              .forEach(kvStore -> {
                  Checklist<String> keys = participantsForKeys.get(kvStore);
                  kvStore.handleCommit(transactionRef, keys);
              });
  }

  non-public Map<TransactionalKVStore, Checklist<String>> getParticipants(Checklist<String> participatingKeys) {
      return participatingKeys.stream()
              .map(ok -> Pair.of(serverFor(ok), ok))
              .gather(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())));
  }

The cluster node receiving the put together requests do two issues:

  • It tries to seize the write locks for all the keys.
  • As soon as profitable, it writes all the modifications to the write-ahead log.

If it may possibly efficiently do these,
it may possibly assure that there aren’t any conflicting transactions,
and even within the case of a crash the cluster node can get better all of the
required state to finish the transaction.

class TransactionalKVStore…

  public synchronized CompletableFuture<Boolean> handlePrepare(TransactionRef txn) {
      attempt {
          TransactionState state = getTransactionState(txn);
          if (state.isPrepared()) {
              return CompletableFuture.completedFuture(true); //already ready.
          }

          if (state.isAborted()) {
              return CompletableFuture.completedFuture(false); //aborted by one other transaction.
          }

          Elective<Map<String, String>> pendingUpdates = state.getPendingUpdates();
          CompletableFuture<Boolean> prepareFuture = prepareUpdates(txn, pendingUpdates);
          return prepareFuture.thenApply(ignored -> {
              Map<String, Lock> locksHeldByTxn = lockManager.getAllLocksFor(txn);
              state.markPrepared();
              writeToWAL(new TransactionMarker(txn, locksHeldByTxn, TransactionStatus.PREPARED));
              return true;
          });

      } catch (TransactionException| WriteConflictException e) {
          logger.error(e);
      }
      return CompletableFuture.completedFuture(false);
  }

  non-public CompletableFuture<Boolean> prepareUpdates(TransactionRef txn, Elective<Map<String, String>> pendingUpdates)  {
      if (pendingUpdates.isPresent()) {
          Map<String, String> pendingKVs = pendingUpdates.get();
          CompletableFuture<Checklist<TransactionRef>> lockFuture = acquireLocks(txn, pendingKVs.keySet());
          return lockFuture.thenApply(ignored -> {
              writeToWAL(txn, pendingKVs);
              return true;
          });
      }
      return CompletableFuture.completedFuture(true);
  }

  TransactionState getTransactionState(TransactionRef txnRef) {
      return ongoingTransactions.get(txnRef);
  }

  non-public void writeToWAL(TransactionRef txn, Map<String, String> pendingUpdates) {
     for (String key : pendingUpdates.keySet()) {
          String worth = pendingUpdates.get(key);
          wal.writeEntry(new SetValueCommand(txn, key, worth).serialize());
      }
  }

  non-public CompletableFuture<Checklist<TransactionRef>> acquireLocks(TransactionRef txn, Set<String> keys) {
      Checklist<CompletableFuture<TransactionRef>> lockFutures = new ArrayList<>();
      for (String key : keys) {
          CompletableFuture<TransactionRef> lockFuture = lockManager.purchase(txn, key, LockMode.READWRITE);
          lockFutures.add(lockFuture);
      }
      return sequence(lockFutures);
  }

When the cluster node receives the commit message from the coordinator,
it’s secure to make the key-value modifications seen.
The cluster node does three issues whereas committing the modifications:

  • It marks the transaction as dedicated. Ought to the cluster node fail at this level,
    it is aware of the end result of the transaction, and may repeat the next steps.
  • It applies all of the modifications to the key-value storage
  • It releases all of the acquired locks.

class TransactionalKVStore…

  public synchronized void handleCommit(TransactionRef transactionRef, Checklist<String> keys) {
      if (!ongoingTransactions.containsKey(transactionRef)) {
          return; //it is a no-op. Already dedicated.
      }

      if (!lockManager.hasLocksFor(transactionRef, keys)) {
          throw new IllegalStateException("Transaction " + transactionRef + " ought to maintain all of the required locks for keys " + keys);
      }

      writeToWAL(new TransactionMarker(transactionRef, TransactionStatus.COMMITTED, keys));

      applyPendingUpdates(transactionRef);

      releaseLocks(transactionRef, keys);
  }

  non-public void removeTransactionState(TransactionRef txnRef) {
      ongoingTransactions.take away(txnRef);
  }


  non-public void applyPendingUpdates(TransactionRef txnRef) {
      TransactionState state = getTransactionState(txnRef);
      Elective<Map<String, String>> pendingUpdates = state.getPendingUpdates();
      apply(txnRef, pendingUpdates);
  }

  non-public void apply(TransactionRef txnRef, Elective<Map<String, String>> pendingUpdates) {
      if (pendingUpdates.isPresent()) {
          Map<String, String> pendingKv = pendingUpdates.get();
          apply(pendingKv);
      }
      removeTransactionState(txnRef);
  }

  non-public void apply(Map<String, String> pendingKv) {
      for (String key : pendingKv.keySet()) {
          String worth = pendingKv.get(key);
          kv.put(key, worth);
      }
  }
  non-public void releaseLocks(TransactionRef txn, Checklist<String> keys) {
          lockManager.launch(txn, keys);
  }

  non-public Lengthy writeToWAL(TransactionMarker transactionMarker) {
     return wal.writeEntry(transactionMarker.serialize());
  }

The rollback is applied in an analogous approach. If there’s any failure,
the shopper communicates with the coordinator to rollback the transaction.

class TransactionClient…

  public void rollback() {
      coordinator.rollback(transactionRef);
  }

The transaction coordinator information the state of the transaction as making ready
to rollback. Then it forwards the rollback request to all the servers
which saved the values for the given transaction.
As soon as all the requests are profitable, the coordinator marks the transaction
rollback as full.In case the coordinator crashes after the transaction
is marked as ‘ready to rollback’, it may possibly carry on sending the rollback
messages to all of the collaborating cluster nodes.

class TransactionCoordinator…

  public void rollback(TransactionRef transactionRef) {
      transactions.get(transactionRef).markPrepareToRollback(this.transactionLog);

      sendRollbackMessageToParticipants(transactionRef);

      transactions.get(transactionRef).markRollbackComplete(this.transactionLog);
  }

  non-public void sendRollbackMessageToParticipants(TransactionRef transactionRef) {
      TransactionMetadata transactionMetadata = transactions.get(transactionRef);
      var individuals = getParticipants(transactionMetadata.getParticipatingKeys());
      for (TransactionalKVStore kvStore : individuals.keySet()) {
          Checklist<String> keys = individuals.get(kvStore);
          kvStore.handleRollback(transactionMetadata.getTxn(), keys);
      }
  }

The cluster nodes receiving the rollback request does three issues:

  • It information the state of the transaction as rolled again within the write-ahead log.
  • It discards the transaction state.
  • It releases all the locks

class TransactionalKVStore…

  public synchronized void handleRollback(TransactionRef transactionRef, Checklist<String> keys) {
      if (!ongoingTransactions.containsKey(transactionRef)) {
          return; //no-op. Already rolled again.
      }
      writeToWAL(new TransactionMarker(transactionRef, TransactionStatus.ROLLED_BACK, keys));
      this.ongoingTransactions.take away(transactionRef);
      this.lockManager.launch(transactionRef, keys);
  }

Idempotent Operations

In case of community failures, the coordinator can retry calls to
put together, commit or abort. So these operations have to be
idempotent.

An Instance State of affairs

Atomic Writes

Contemplate the next state of affairs. Paula Blue has a truck and Steven Inexperienced
has a backhoe.
The supply and the reserving standing of the truck and the backhoe
are saved on a distributed key-value retailer.
Relying on how the keys are mapped to servers,
Blue’s truck and Inexperienced’s backhoe bookings are saved on
separate cluster nodes.
Alice is attempting to e-book a truck
and backhoe for the development work she is planning to begin on a Monday.
She wants each the truck and the backhoe to be accessible.

The reserving state of affairs occurs as follows.

Alice checks the provision of Blue’s truck and Inexperienced’s backhoe.
by studying the keys ‘truck_booking_monday’ and ‘backhoe_booking_monday’

If the values are empty, the reserving is free.
She reserves the truck and the backhoe.
It is necessary that each the values are set atomically.
If there’s any failure, then not one of the values is ready.

The commit occurs in two phases. The primary server Alice
contacts acts because the coordinator and executes the 2 phases.

The coordinator is a separate participant within the
protocol, and is proven that approach on the sequence diagram. Nevertheless
normally one of many servers (Blue or Inexperienced) acts as
the coordinator, thus taking part in two roles within the interplay.

Conflicting Transactions

Contemplate a state of affairs the place one other individual, Bob, can be attempting to e-book a
truck and backhoe for development work on the identical Monday.

The reserving state of affairs occurs as follows:

  • Each Alice and Bob learn the keys ‘truck_booking_monday’
    and ‘backhoe_booking_monday’
  • Each see that the values are empty, that means the reserving is free.
  • Each attempt to e-book the truck and the backhoe.

The expectation is that, solely Alice or Bob, ought to have the ability to e-book,
as a result of the transactions are conflicting.
In case of errors, the entire movement must be retried and hopefully,
one will go forward with the reserving.
However in no state of affairs, ought to reserving be executed partially.
Both each bookings needs to be executed or neither is completed.

To test the provision, each Alice and Bob begin a transaction
and speak to Blue and Inexperienced’s servers respectively to test for the provision.
Blue holds a learn lock for the important thing “truck_booking_on_monday” and
Inexperienced holds a learn lock for the important thing “backhoe_booking_on_monday”.
As a result of learn locks are shared, each Alice and Bob can learn the values.

Alice and Bob see that each the bookings can be found on Monday.
So that they reserve by sending the put requests to servers.
Each the servers maintain the put requests within the short-term storage.

When Alice and Bob resolve to commit the transactions-
assuming that Blue acts as a coordinator- it triggers the two-phase
commit protocol and sends the put together requests to itself and Inexperienced.

For Alice’s request it tries to seize a write lock for the important thing ‘truck_booking_on_monday’, which
it cannot get, as a result of there’s a conflicting learn lock grabbed by
one other transaction. So Alice’s transaction fails within the put together section.
The identical factor occurs with Bob’s request.

Transactions might be retried with a retry loop as follows:

class TransactionExecutor…

  public boolean executeWithRetry(Operate<TransactionClient, Boolean> txnMethod, ReplicaMapper replicaMapper, SystemClock systemClock) {
      for (int try = 1; try <= maxRetries; try++) {
          TransactionClient shopper = new TransactionClient(replicaMapper, systemClock);
          attempt {
              boolean checkPassed = txnMethod.apply(shopper);
              Boolean successfullyCommitted = shopper.commit().get();
              return checkPassed && successfullyCommitted;
          } catch (Exception e) {
              logger.error("Write battle detected whereas executing." + shopper.transactionRef + " Retrying try " + try);
              shopper.rollback();
              randomWait(); //await random interval
          }

      }
      return false;
  }

The instance reserving code for Alice and Bob will look as follows:

class TransactionalKVStoreTest…

  @Take a look at
  public void retryWhenConflict()  (!aliceTxn.isSuccess() && bobTxn.isSuccess()), "ready for one txn to finish", Period.ofSeconds(50));
  

  non-public TransactionExecutor bookTransactionally(Checklist<TransactionalKVStore> allServers, String person, SystemClock systemClock) {
      Checklist<String> bookingKeys = Arrays.asList("truck_booking_on_monday", "backhoe_booking_on_monday");
      TransactionExecutor t1 = new TransactionExecutor(allServers);
      t1.executeAsyncWithRetry(txnClient -> {
          if (txnClient.isAvailable(bookingKeys)) {
              txnClient.reserve(bookingKeys, person);
              return true;
          }
          return false;
      }, systemClock);
      return t1;
  }

On this case one of many transactions will ultimately succeed and
the opposite will again out.

Whereas it is extremely straightforward to implement, with Error WaitPolicy ,
there shall be a number of transaction restarts,decreasing the general
throughput.
As defined within the above part, if Wound-Wait coverage is used
it is going to have fewer transaction restarts. Within the above instance,
just one transaction will presumably restart as a substitute of each restarting
in case of conflicts.

Utilizing Versioned Worth

It is extremely constraining to have conflicts for all of the learn and write
operations, significantly so when the transactions might be read-only.
It’s optimum if read-only transactions can work with out holding any
locks and nonetheless assure that the values learn in a transaction
don’t change with a concurrent read-write transaction.

Information-stores usually retailer a number of variations of the values,
as described in Versioned Worth.
The model used is the timestamp following Lamport Clock.
Largely a Hybrid Clock is utilized in databases like
MongoDB or CockroachDB.
To make use of it with the two-phase commit protocol, the trick is that each server
collaborating within the transaction sends the timestamp it may possibly write the
values at, as response to the put together request.
The coordinator chooses the utmost of those timestamps as a
commit timestamp and sends it together with the worth.
The collaborating servers then save the worth on the commit timestamp.
This permits read-only requests to be executed with out holding locks,
as a result of it is assured that the worth written at a specific timestamp
isn’t going to alter.

Contemplate a easy instance as follows. Philip is working a report back to learn
all the bookings that occurred till timestamp 2. If it’s a long-running
operation holding a lock, Alice, who’s attempting to e-book a truck, shall be blocked
till Philip’s work completes. With Versioned Worth
Philip’s get requests, that are a part of a read-only operation, can proceed
at timestamp 2, whereas Alice’s reserving continues at timestamp 4.

Be aware that learn requests that are a part of a read-write transaction,
nonetheless want to carry a lock.

The instance code with Lamport Clock seems as follows:

class MvccTransactionalKVStore…

  public String readOnlyGet(String key, lengthy readTimestamp) {
      adjustServerTimestamp(readTimestamp);
      return kv.get(new VersionedKey(key, readTimestamp));
  }

  public CompletableFuture<String> get(TransactionRef txn, String key, lengthy readTimestamp) {
      adjustServerTimestamp(readTimestamp);
      CompletableFuture<TransactionRef> lockFuture = lockManager.purchase(txn, key, LockMode.READ);
      return lockFuture.thenApply(transactionRef -> {
          getOrCreateTransactionState(transactionRef);
          return kv.get(key);
      });
  }

  non-public void adjustServerTimestamp(lengthy readTimestamp) {
      this.timestamp = readTimestamp > this.timestamp ? readTimestamp:timestamp;
  }

  public void put(TransactionRef txnId, String key, String worth) {
      timestamp = timestamp + 1;
      TransactionState transactionState = getOrCreateTransactionState(txnId);
      transactionState.addPendingUpdates(key, worth);
  }

class MvccTransactionalKVStore…

  non-public lengthy put together(TransactionRef txn, Elective<Map<String, String>> pendingUpdates) throws WriteConflictException, IOException {
      if (pendingUpdates.isPresent()) {
          Map<String, String> pendingKVs = pendingUpdates.get();

          acquireLocks(txn, pendingKVs);

          timestamp = timestamp + 1; //increment the timestamp for write operation.

          writeToWAL(txn, pendingKVs, timestamp);
       }
      return timestamp;
  }

class MvccTransactionCoordinator…

  public lengthy commit(TransactionRef txn) {
          lengthy commitTimestamp = put together(txn);

          TransactionMetadata transactionMetadata = transactions.get(txn);
          transactionMetadata.markPreparedToCommit(commitTimestamp, this.transactionLog);

          sendCommitMessageToAllTheServers(txn, commitTimestamp, transactionMetadata.getParticipatingKeys());

          transactionMetadata.markCommitComplete(transactionLog);

          return commitTimestamp;
  }


  public lengthy put together(TransactionRef txn) throws WriteConflictException {
      TransactionMetadata transactionMetadata = transactions.get(txn);
      Map<MvccTransactionalKVStore, Checklist<String>> keysToServers = getParticipants(transactionMetadata.getParticipatingKeys());
      Checklist<Lengthy> prepareTimestamps = new ArrayList<>();
      for (MvccTransactionalKVStore retailer : keysToServers.keySet()) {
          Checklist<String> keys = keysToServers.get(retailer);
          lengthy prepareTimestamp = retailer.put together(txn, keys);
          prepareTimestamps.add(prepareTimestamp);
      }
      return prepareTimestamps.stream().max(Lengthy::evaluate).orElse(txn.getStartTimestamp());
  }

All of the collaborating cluster nodes then retailer the key-values on the
commit timestamp.

class MvccTransactionalKVStore…

  public void commit(TransactionRef txn, Checklist<String> keys, lengthy commitTimestamp) {
      if (!lockManager.hasLocksFor(txn, keys)) {
          throw new IllegalStateException("Transaction ought to maintain all of the required locks");
      }

      adjustServerTimestamp(commitTimestamp);

      applyPendingOperations(txn, commitTimestamp);

      lockManager.launch(txn, keys);

      logTransactionMarker(new TransactionMarker(txn, TransactionStatus.COMMITTED, commitTimestamp, keys, Collections.EMPTY_MAP));
  }

  non-public void applyPendingOperations(TransactionRef txnId, lengthy commitTimestamp) {
      Elective<TransactionState> transactionState = getTransactionState(txnId);
      if (transactionState.isPresent()) {
          TransactionState t = transactionState.get();
          Elective<Map<String, String>> pendingUpdates = t.getPendingUpdates();
          apply(txnId, pendingUpdates, commitTimestamp);
      }
  }

  non-public void apply(TransactionRef txnId, Elective<Map<String, String>> pendingUpdates, lengthy commitTimestamp) {
      if (pendingUpdates.isPresent()) {
          Map<String, String> pendingKv = pendingUpdates.get();
          apply(pendingKv, commitTimestamp);
      }
      ongoingTransactions.take away(txnId);
  }


  non-public void apply(Map<String, String> pendingKv, lengthy commitTimestamp) {
      for (String key : pendingKv.keySet()) {
          String worth = pendingKv.get(key);
          kv.put(new VersionedKey(key, commitTimestamp), worth);
      }
  }

Technical Issues

There may be one other delicate concern to be tackled right here.
As soon as a specific response is returned at a given timestamp,
no write ought to occur at a decrease timestamp than the one obtained in
the learn request.
That is achieved by totally different strategies.
Google Percolator and
datastores like TiKV impressed by
Percolator use a separate server referred to as Timestamp oracle which is
assured to provide monotonic timestamps.
Databases like MongoDB or CockroachDB
use Hybrid Clock to
assure it as a result of each request will regulate the hybrid clock
on every server to be essentially the most up-todate. The timestamp can be
superior monotonically with each write request.
Lastly, the commit section picks up the utmost timestamp throughout the set
of collaborating servers, ensuring that the write will all the time
comply with a earlier learn request.

You will need to be aware that, if the shopper is studying
at a timestamp worth decrease than the one at which server is writing to,
it isn’t a problem. But when the shopper is studying at a timestamp whereas the server
is about to jot down at a specific timestamp, then it’s a downside. If servers
detect {that a} shopper is studying at a timestamp which the server might need
an in-flight writes (those that are solely ready), the servers reject
the write. CockroachDB throws error an if a learn occurs at
a timestamp for which there’s an ongoing transaction.
Spanner reads have a section the place the shopper will get the
time of the final profitable write on a specific partition. If a
shopper reads at the next timestamp, the learn requests wait until the writes
occur at that timestamp.

Utilizing Replicated Log

To enhance fault tolerance cluster nodes use Replicated Log.
The coordinator makes use of Replicated Log to retailer the transaction log entries.

Contemplating the instance of Alice and Bob within the above part,
the Blue servers shall be a gaggle of servers, so are the Inexperienced servers.
All of the reserving knowledge shall be replicated throughout a set of servers.
Every request which is a part of the two-phase commit goes to the chief
of the server group. The replication is applied utilizing
Replicated Log.

The shopper communicates with the chief of every server group.
The replication is important solely when the shopper decides to commit the
transaction, so it occurs as a part of the put together request.

The coordinator replicates each state change to replicated log as nicely.

In a distributed datastore, every cluster node handles a number of partitions.
A Replicated Log is maintained per partition.
When Raft is used as a part of replication it is generally
known as multi-raft.

Shopper communicates with the chief of every partition collaborating in
the transaction.

Failure Dealing with

Two-phase commit protocol closely depends on the coordinator node
to speak the end result of the transaction.
Till the end result of the transaction is understood,
the person cluster nodes can’t permit another transactions
to jot down to the keys collaborating within the pending transaction.
The cluster nodes block till the end result of the transaction is understood.
This places some essential necessities on the coordinator

The coordinator wants to recollect the state of the transactions
even in case of a course of crash.

Coordinator makes use of Write-Forward Log to report each replace
to the state of the transaction.
This fashion, when the coordinator crashes and comes again up,
it may possibly proceed to work on the transactions that are incomplete.

class TransactionCoordinator…

  public void loadTransactionsFromWAL() throws IOException {
      Checklist<WALEntry> walEntries = this.transactionLog.readAll();
      for (WALEntry walEntry : walEntries) {
          TransactionMetadata txnMetadata = (TransactionMetadata) Command.deserialize(new ByteArrayInputStream(walEntry.getData()));
          transactions.put(txnMetadata.getTxn(), txnMetadata);
      }
      startTransactionTimeoutScheduler();
      completePreparedTransactions();
  }
  non-public void completePreparedTransactions() throws IOException {
      Checklist<Map.Entry<TransactionRef, TransactionMetadata>> preparedTransactions
              = transactions.entrySet().stream().filter(entry -> entry.getValue().isPrepared()).gather(Collectors.toList());
      for (Map.Entry<TransactionRef, TransactionMetadata> preparedTransaction : preparedTransactions) {
          TransactionMetadata txnMetadata = preparedTransaction.getValue();
          sendCommitMessageToParticipants(txnMetadata.getTxn());
      }
  }

The shopper can fail earlier than sending the commit message to the coordinator.

The transaction coordinator tracks when every transaction state was up to date.
If no state replace is obtained in a timeout interval, which is configured,
it triggers a transaction rollback.

class TransactionCoordinator…

  non-public ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1);
  non-public ScheduledFuture<?> taskFuture;
  non-public lengthy transactionTimeoutMs = Lengthy.MAX_VALUE; //for now.

  public void startTransactionTimeoutScheduler() {
      taskFuture = scheduler.scheduleAtFixedRate(() -> timeoutTransactions(),
              transactionTimeoutMs,
              transactionTimeoutMs,
              TimeUnit.MILLISECONDS);
  }

  non-public void timeoutTransactions() {
      for (TransactionRef txnRef : transactions.keySet()) {
          TransactionMetadata transactionMetadata = transactions.get(txnRef);
          lengthy now = systemClock.nanoTime();
          if (transactionMetadata.hasTimedOut(now)) {
              sendRollbackMessageToParticipants(transactionMetadata.getTxn());
              transactionMetadata.markRollbackComplete(transactionLog);
          }
      }
  }

Transactions throughout heterogenous programs

The answer outlined right here demonstrates the two-phase commit implementation
in a homogenous system. Homogenous that means all of the cluster nodes are half
of the identical system and retailer similar sort of knowledge. For instance
a distributed knowledge retailer like MongoDb or a distributed message dealer
like Kafka.

Traditionally, two-phase commit was largely mentioned within the context of
heterogeneous programs. Most typical utilization of two-phase commit was
with [XA] transactions. Within the J2EE servers, it is extremely
widespread to make use of two-phase commit throughout a message dealer and a database.
The most typical utilization sample is when a message must be produced
on a message dealer like ActiveMQ or JMS and a report must be
inserted/up to date in a database.

As seen within the above sections, the fault tolerance of the coordinator
performs a essential function in two-phase commit implementation. In case of XA
transactions the coordinator is generally the appliance course of making
the database and message dealer calls. The appliance in most trendy
eventualities is a stateless microservice which is working in a containerized
setting. It isn’t actually an acceptable place to place the accountability
of the coordinator. The coordinator wants to keep up state and get better
rapidly from failures to commit or rollback, which is tough to
implement on this case.

That is the rationale that whereas XA transactions appear so enticing, they
typically run into points
in observe
and are averted. Within the microservices
world, patterns like [transactional-outbox] are most popular over
XA transactions.

However most distributed storage programs implement
two-phase commit throughout a set of partitions, and it really works nicely in observe.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments