Golang | Distributed Systems

What is the Two Phase Commit Protocol in Distributed Systems

Learning how to implement the Two phase commit protocol in Golang- Part 1 Free Tutorial

Dipto Chakrabarty
7 min readJan 19, 2025
The 2 Phase Commit Protocol

The two phase commit is one of the most important and significant protocols in Distributed Systems and in this free blog we are going to see how to implement it in Golang.

I learn about this while studying distributed systems course at CMU and is one of the popular concepts to know for distributed systems interviews.

Feel free to code with this project and add this project in your resume. The concept deals with a popular area of distributed systems and is easy to pick on for newbies.

What is the two phase commit

According to Wikipedia

The two-phase commit protocol (2PC, tupac) is a type of atomic commitment protocol (ACP). It is a distributed algorithm that coordinates all the processes that participate in a distributed atomic transaction on whether to commit or abort (roll back) the transaction.

The two phase commit protocol is a synchronization between several workers on whether to commit a transaction or abort it.

A central coordinator sits in between all the workers and manages the responses and makes the final call whether to commit a transaction or abort it.

The decision is based on the condition of the worker nodes and the responses they provide.

The phases in two phase commit

The two phase commit protocol runs on different phases and it is important to have a clear idea about these phases.

Prepare: When a transaction arrives at the coordinator , the coordinator must ask the workers if they are prepared to accept transactions.

This is called the prepare phase where the coordinator sends signals to the workers.

VoteCommit: This is sent back by the workers if they are prepared to accept transactions.

VoteAbort: This is sent back by the workers if they are not prepared to accept transactions.

DoCommit: Once the coordinator receives all VoteCommit from the workers it sends a signal to them to commit the transaction.

DoAbort: If any of the workers sends a voteAbort the coordinator abandons the transaction and sends a signal to the workers about cancelling the transaction.

Timeout: If any of the worker is down and unable to respond the coordinator considers a timeout situation where the transaction has to be aborted. This is to reduce too much delay in the system.

Lets look at a few examples of the two phase commit .

Successful Commit

A complete successful run
C : Sends Prepare
W1: Sends VoteCommit
W2: Sends VoteCommit
C: Sees everyone wants to commit sends DoCommit
W1 and W2 Commit

Worker Wants to Abort

When a Worker wants to Abort
C : Sends Prepare
W1: Sends VoteCommit
W2: Sends VoteAbort
C: Sees one of the workers wants to abort sends DoAbort
W1 and W2 do not Commit

Worker times out

Worker Times out
C : Sends Prepare
W1: Sends VoteCommit
W2: Is down due to network issue
C: Sees one of the workers did not respond in time and sends DoAbort
W1 and W2 do not Commit

Step by Step solution code with Golang

For this article we are starting with just the Prepare phase of the protocol.

We are going to use RPC for communication.

** Install Golang if you don't have it in your system for the program.

Common Structs

There are a few structs we need for the prepare args and reply.

Pass the arguments to the workers about the prepare phase.

type PrepareArgs struct {
TransactionID int
}

The reply received by the coordinator about the workers

type PrepareReply struct {
Ok bool // true if worker can prepare
WorkerId int
TransactionID int
}

The Coordinator

Define a coordinator struct which will act as a rpc client to send requests and read responses from the servers aka the workers.

type Coordinator struct {
workerAddr []string
numWorkers int
prepareChan chan shared.PrepareReply // channel for sending prepare requests
commitChan chan bool // workers commit reply is passed here
abortChan chan int // workers abort reply is passed here
dialChan chan *rpc.Client
dialerrChan chan error
port int
mux sync.Mutex // mutex for concurrency ease
ticker *time.Ticker // a ticker for simulating timeout
transactions map[int]int // a map of transactions
transactionID int
commitID int
}

To actually send prepare to the workers we define a method called SendPrepareTransactions

func (cdt *Coordinator) SendPrepareTransaction(peer string, transactionId int) {
client, err := cdt.callServerRPC(peer)
if err != nil {
log.Fatal("Unable to reach rpc server") // if timeout
cdt.abortChan <- transactionId
return
}
defer client.Close()

prepare := shared.PrepareArgs{TransactionID: transactionId}
var prepareReply shared.PrepareReply
err = client.Call("Worker.Prepare", &prepare, &prepareReply) // call the workers Prepare method
if err != nil {
log.Printf("unable to reach worker %s with prepare ", peer)
cdt.abortChan <- transactionId
}

log.Printf("received reply back from worker %s : %v", peer, prepareReply)
if prepareReply.Ok { // if reply is success then send reply to channel
log.Printf("worker %s is ready to accept transaction %d", peer, prepareReply.TransactionID)
cdt.prepareChan <- prepareReply
} else { // if reply is abort send to abort channel
log.Printf("worker %s unable ready to accept transaction %d", peer, prepareReply.TransactionID)
cdt.abortChan <- transactionId
}
}

The callServerRpc is used to return rpc clients while also handling timeouts

func (cdt *Coordinator) callServerRPC(peer string) (*rpc.Client, error) {
go func(peer string) {
client, err := rpc.Dial("tcp", peer)
// defer client.Close()
fmt.Println("server address is ,", peer)
if err != nil {
log.Fatal("Unable to reach rpc server")
cdt.dialerrChan <- err
return
}
cdt.dialChan <- client
return
}(peer)

select {
case <-cdt.ticker.C:
return nil, fmt.Errorf("timeout connect to node %s", peer)
case client := <-cdt.dialChan:
return client, nil
case err := <-cdt.dialerrChan:
return nil, err
}
}

Now to start the coordinator we define a start method, assume one transaction is being simulated with id 1.

func Start(port int, workers []string) *Coordinator {
coordinator := &Coordinator{
port: port,
numWorkers: len(workers),
prepareChan: make(chan shared.PrepareReply, 2),
commitChan: make(chan bool, 2),
workerAddr: workers,
mux: sync.Mutex{},
ticker: time.NewTicker(2 * time.Second),
dialChan: make(chan *rpc.Client, 2),
dialerrChan: make(chan error, 2),
abortChan: make(chan int, 2),
transactions: make(map[int]int),
}

go coordinator.HandleRequest()
coordinator.transactions[1] = 0 // a transaction with id 1is being simulated
// and the map has not received replies for id 1
return coordinator
}

The HandleRequest is the goroutine that is going to sycnchronize between channels

func (cdt *Coordinator) HandleRequest() {
for {
select {
case id := <-cdt.abortChan:
fmt.Println(id)
case reply := <-cdt.prepareChan:
fmt.Println("got the reply ", reply)
cdt.parsePrepareReply(reply)
}
}
}

For now if we get a reply we call the parse Preapre Reply method which dissects the reply. For the case of abort let us now just print something we will implement it in part 2.

func (cdt *Coordinator) parsePrepareReply(reply shared.PrepareReply) {
cdt.mux.Lock()
defer cdt.mux.Unlock()
transactionID := reply.TransactionID
cdt.transactions[transactionID] += 1
log.Println("this is transaction id", reply)
if cdt.transactions[transactionID] == cdt.numWorkers {
cdt.commitTransaction(transactionID) // number of VoteCommit equal to
// number of workers so commit
}
}

func (cdt *Coordinator) commitTransaction(transactionId int) {
fmt.Println("commit the id ", transactionId)
}

A coordinator signals commit when the number of replies of a transaction id is equal to number of workers.

The actual process after commit will be showcased in part 3.

The Workers

Define the worker struct

type Worker struct {
mux sync.Mutex
id int
port int
}

Now the worker has to listen for incoming rpc calls from clients so we define that in the start method

func Start(port int, id int) (*Worker, string) {
w := &Worker{
id: id,
port: port,
mux: sync.Mutex{},
}

rpc.RegisterName("Worker", w) // Register the workers with Worker name

// Start listening
address := fmt.Sprintf("localhost:%d", port)

listener, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("Failed to listen on %s: %v\n", address, err)
}
log.Printf("Worker listening on %s...\n", address)

// Accept incoming connections in a loop
go func() {
for {
conn, err := listener.Accept() // accept incoming connections

if err != nil {
log.Println("Accept error:", err)
continue
}
// Serve the connection in a new goroutine
remoteAddr := conn.RemoteAddr()
localAddr := conn.LocalAddr()
log.Printf("Accepted connection from %s to %s\n", remoteAddr, localAddr)
go rpc.ServeConn(conn)

}
}()
return w, address
}

Finally implement the prepare method

func (w *Worker) Prepare(args *shared.PrepareArgs, reply *shared.PrepareReply) error {
//w.mux.Lock()
//defer w.mux.Unlock()

reply.Ok = true // reply true for all now
reply.WorkerId = w.id
reply.TransactionID = args.TransactionID

return nil
}

Running the whole code

We work with 1 coordinator and 2 workers for the implementation

package main

import (
"fmt"

"github.com/DiptoChakrabarty/2-phase-commit/coordinator"
"github.com/DiptoChakrabarty/2-phase-commit/worker"
)

func main() {
_, address1 := worker.Start(8080, 1)
_, address2 := worker.Start(8085, 2)
addresses := []string{address1, address2}
coordinator := coordinator.Start(8000, addresses)
coordinator.SendPrepareTransaction(address1, 1)
coordinator.SendPrepareTransaction(address2, 1)
}

The instructions here starts one coordinator with two workers and sends a prepare transaction of id 1.

Run the command

go run main.go
The transaction is committed

Conclusion

The prepare phase is the simplest phase to be implemented and the above article illustrates how to design the two phase commit.

The next article we will be refining the codebase implementing the abort and DoCommit and DoAbort.

Stay tuned for more such free online guide and if you have learnt from them please like for engagement.

--

--

Dipto Chakrabarty
Dipto Chakrabarty

Written by Dipto Chakrabarty

MS @CMU , Site Reliability Engineer , I talk about Cloud Distributed Systems. Tech Doctor making sure to diagnose and make your apps run smoothly in production.

No responses yet