In this section you will find the code for entire app.go
file.
package abci
import (
"context"
"crypto"
"encoding/json"
"errors"
"fmt"
"strings"
"github.com/dgraph-io/badger/v4"
"github.com/cometbft/cometbft/abci/tutorials/abci-v2-forum-app/model"
abci "github.com/cometbft/cometbft/abci/types"
cryptoencoding "github.com/cometbft/cometbft/crypto/encoding"
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/version"
)
const (
ApplicationVersion = 1
CurseWordsLimitVE = 10
)
type ForumApp struct {
abci.BaseApplication
valAddrToPubKeyMap map[string]crypto.PublicKey
CurseWords string
state AppState
onGoingBlock *badger.Txn
logger log.Logger
}
func NewForumApp(dbDir string, appConfigPath string, logger log.Logger) (*ForumApp, error) {
db, err := model.NewDB(dbDir)
if err != nil {
return nil, fmt.Errorf("error initializing database: %w", err)
}
cfg, err := LoadConfig(appConfigPath)
if err != nil {
return nil, fmt.Errorf("error loading config file: %w", err)
}
cfg.CurseWords = DeduplicateCurseWords(cfg.CurseWords)
state, err := loadState(db)
if err != nil {
return nil, err
}
// Reading the validators from the DB because CometBFT expects the application to have them in memory
valMap := make(map[string]crypto.PublicKey)
validators, err := state.DB.GetValidators()
if err != nil {
return nil, fmt.Errorf("can't load validators: %w", err)
}
for _, v := range validators {
pubKey, err := cryptoencoding.PubKeyFromTypeAndBytes(v.PubKeyType, v.PubKeyBytes)
if err != nil {
return nil, fmt.Errorf("can't decode public key: %w", err)
}
valMap[string(pubKey.Address())] = pubKey
}
return &ForumApp{
state: state,
valAddrToPubKeyMap: valMap,
CurseWords: cfg.CurseWords,
logger: logger,
}, nil
}
// Info return application information.
func (app *ForumApp) Info(_ context.Context, _ *abci.InfoRequest) (*abci.InfoResponse, error) {
return &abci.InfoResponse{
Version: version.ABCIVersion,
AppVersion: ApplicationVersion,
LastBlockHeight: app.state.Height,
LastBlockAppHash: app.state.Hash(),
}, nil
}
// Query the application state for specific information.
func (app *ForumApp) Query(_ context.Context, query *abci.QueryRequest) (*abci.QueryResponse, error) {
app.logger.Info("Executing Application Query")
resp := abci.QueryResponse{Key: query.Data}
// Parse sender from query data
sender := string(query.Data)
if sender == "history" {
messages, err := model.FetchHistory(app.state.DB)
if err != nil {
return nil, err
}
resp.Log = messages
resp.Value = []byte(messages)
return &resp, nil
}
// Retrieve all message sent by the sender
messages, err := model.GetMessagesBySender(app.state.DB, sender)
if err != nil {
return nil, err
}
// Convert the messages to JSON and return as query result
resultBytes, err := json.Marshal(messages)
if err != nil {
return nil, err
}
resp.Log = string(resultBytes)
resp.Value = resultBytes
return &resp, nil
}
// CheckTx handles validation of inbound transactions. If a transaction is not a valid message, or if a user
// does not exist in the database or if a user is banned it returns an error.
func (app *ForumApp) CheckTx(_ context.Context, req *abci.CheckTxRequest) (*abci.CheckTxResponse, error) {
app.logger.Info("Executing Application CheckTx")
// Parse the tx message
msg, err := model.ParseMessage(req.Tx)
if err != nil {
app.logger.Info("CheckTx: failed to parse transaction message", "message", msg, "error", err)
return &abci.CheckTxResponse{Code: CodeTypeInvalidTxFormat, Log: "Invalid transaction", Info: err.Error()}, nil
}
// Check for invalid sender
if len(msg.Sender) == 0 {
app.logger.Info("CheckTx: failed to parse transaction message", "message", msg, "error", "Sender is missing")
return &abci.CheckTxResponse{Code: CodeTypeInvalidTxFormat, Log: "Invalid transaction", Info: "Sender is missing"}, nil
}
app.logger.Debug("searching for sender", "sender", msg.Sender)
u, err := app.state.DB.FindUserByName(msg.Sender)
if err != nil {
if !errors.Is(err, badger.ErrKeyNotFound) {
app.logger.Error("CheckTx: Error in check tx", "tx", string(req.Tx), "error", err)
return &abci.CheckTxResponse{Code: CodeTypeEncodingError, Log: "Invalid transaction", Info: err.Error()}, nil
}
app.logger.Info("CheckTx: Sender not found", "sender", msg.Sender)
} else if u != nil && u.Banned {
return &abci.CheckTxResponse{Code: CodeTypeBanned, Log: "Invalid transaction", Info: "User is banned"}, nil
}
app.logger.Info("CheckTx: success checking tx", "message", msg.Message, "sender", msg.Sender)
return &abci.CheckTxResponse{Code: CodeTypeOK, Log: "Valid transaction", Info: "Transaction validation succeeded"}, nil
}
// Consensus Connection
// InitChain initializes the blockchain with information sent from CometBFT such as validators or consensus parameters.
func (app *ForumApp) InitChain(_ context.Context, req *abci.InitChainRequest) (*abci.InitChainResponse, error) {
app.logger.Info("Executing Application InitChain")
for _, v := range req.Validators {
err := app.updateValidator(v)
if err != nil {
return nil, err
}
}
appHash := app.state.Hash()
// This parameter can also be set in the genesis file
req.ConsensusParams.Feature.VoteExtensionsEnableHeight.Value = 1
return &abci.InitChainResponse{ConsensusParams: req.ConsensusParams, AppHash: appHash}, nil
}
// PrepareProposal is used to prepare a proposal for the next block in the blockchain. The application can re-order, remove
// or add transactions.
func (app *ForumApp) PrepareProposal(_ context.Context, req *abci.PrepareProposalRequest) (*abci.PrepareProposalResponse, error) {
app.logger.Info("Executing Application PrepareProposal")
// Get the curse words from for all vote extensions received at the end of last height.
voteExtensionCurseWords := app.getWordsFromVe(req.LocalLastCommit.Votes)
curseWords := strings.Split(voteExtensionCurseWords, "|")
if hasDuplicateWords(curseWords) {
return nil, errors.New("duplicate words found")
}
// Prepare req puts the BanTx first, then adds the other transactions
// ProcessProposal should verify this
proposedTxs := make([][]byte, 0)
finalProposal := make([][]byte, 0)
bannedUsersString := make(map[string]struct{})
for _, tx := range req.Txs {
msg, err := model.ParseMessage(tx)
if err != nil {
// this should never happen since the tx should have been validated by CheckTx
return nil, fmt.Errorf("failed to marshal tx in PrepareProposal: %w", err)
}
// Adding the curse words from vote extensions too
if !hasCurseWord(msg.Message, voteExtensionCurseWords) {
proposedTxs = append(proposedTxs, tx)
continue
}
// If the message contains curse words then ban the user by
// creating a "ban transaction" and adding it to the final proposal
banTx := model.BanTx{UserName: msg.Sender}
bannedUsersString[msg.Sender] = struct{}{}
resultBytes, err := json.Marshal(banTx)
if err != nil {
// this should never happen since the ban tx should have been validated by CheckTx
return nil, fmt.Errorf("failed to marshal ban tx in PrepareProposal: %w", err)
}
finalProposal = append(finalProposal, resultBytes)
}
// Need to loop again through the proposed Txs to make sure there is none left by a user that was banned
// after the tx was accepted
for _, tx := range proposedTxs {
// there should be no error here as these are just transactions we have checked and added
msg, err := model.ParseMessage(tx)
if err != nil {
// this should never happen since the tx should have been validated by CheckTx
return nil, fmt.Errorf("failed to marshal tx in PrepareProposal: %w", err)
}
// If the user is banned then include this transaction in the final proposal
if _, ok := bannedUsersString[msg.Sender]; !ok {
finalProposal = append(finalProposal, tx)
}
}
return &abci.PrepareProposalResponse{Txs: finalProposal}, nil
}
// ProcessProposal validates the proposed block and the transactions and return a status if it was accepted or rejected.
func (app *ForumApp) ProcessProposal(_ context.Context, req *abci.ProcessProposalRequest) (*abci.ProcessProposalResponse, error) {
app.logger.Info("Executing Application ProcessProposal")
bannedUsers := make(map[string]struct{}, 0)
finishedBanTxIdx := len(req.Txs)
for i, tx := range req.Txs {
if !isBanTx(tx) {
finishedBanTxIdx = i
break
}
var parsedBan model.BanTx
err := json.Unmarshal(tx, &parsedBan)
if err != nil {
return &abci.ProcessProposalResponse{Status: abci.PROCESS_PROPOSAL_STATUS_REJECT}, err
}
bannedUsers[parsedBan.UserName] = struct{}{}
}
for _, tx := range req.Txs[finishedBanTxIdx:] {
// From this point on, there should be no BanTxs anymore
// If there is one, ParseMessage will return an error as the
// format of the two transactions is different.
msg, err := model.ParseMessage(tx)
if err != nil {
return &abci.ProcessProposalResponse{Status: abci.PROCESS_PROPOSAL_STATUS_REJECT}, err
}
if _, ok := bannedUsers[msg.Sender]; ok {
// sending us a tx from a banned user
return &abci.ProcessProposalResponse{Status: abci.PROCESS_PROPOSAL_STATUS_REJECT}, nil
}
}
return &abci.ProcessProposalResponse{Status: abci.PROCESS_PROPOSAL_STATUS_ACCEPT}, nil
}
// FinalizeBlock Deliver the decided block to the Application.
func (app *ForumApp) FinalizeBlock(_ context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) {
app.logger.Info("Executing Application FinalizeBlock")
// Iterate over Tx in current block
app.onGoingBlock = app.state.DB.GetDB().NewTransaction(true)
respTxs := make([]*abci.ExecTxResult, len(req.Txs))
finishedBanTxIdx := len(req.Txs)
for i, tx := range req.Txs {
var err error
if !isBanTx(tx) {
finishedBanTxIdx = i
break
}
banTx := new(model.BanTx)
err = json.Unmarshal(tx, &banTx)
if err != nil {
// since we did this in ProcessProposal this should never happen here
return nil, err
}
err = UpdateOrSetUser(app.state.DB, banTx.UserName, true, app.onGoingBlock)
if err != nil {
return nil, err
}
respTxs[i] = &abci.ExecTxResult{Code: CodeTypeOK}
}
for idx, tx := range req.Txs[finishedBanTxIdx:] {
// From this point on, there should be no BanTxs anymore
// If there is one, ParseMessage will return an error as the
// format of the two transactions is different.
msg, err := model.ParseMessage(tx)
i := idx + finishedBanTxIdx
if err != nil {
// since we did this in ProcessProposal this should never happen here
return nil, err
}
// Check if this sender already existed; if not, add the user too
err = UpdateOrSetUser(app.state.DB, msg.Sender, false, app.onGoingBlock)
if err != nil {
return nil, err
}
// Add the message for this sender
message, err := model.AppendToExistingMessages(app.state.DB, *msg)
if err != nil {
return nil, err
}
err = app.onGoingBlock.Set([]byte(msg.Sender+"msg"), []byte(message))
if err != nil {
return nil, err
}
chatHistory, err := model.AppendToChat(app.state.DB, *msg)
if err != nil {
return nil, err
}
// Append messages to chat history
err = app.onGoingBlock.Set([]byte("history"), []byte(chatHistory))
if err != nil {
return nil, err
}
// This adds the user to the DB, but the data is not committed nor persisted until Commit is called
respTxs[i] = &abci.ExecTxResult{Code: abci.CodeTypeOK}
app.state.Size++
}
app.state.Height = req.Height
response := &abci.FinalizeBlockResponse{TxResults: respTxs, AppHash: app.state.Hash()}
return response, nil
}
// Commit the application state.
func (app *ForumApp) Commit(_ context.Context, _ *abci.CommitRequest) (*abci.CommitResponse, error) {
app.logger.Info("Executing Application Commit")
if err := app.onGoingBlock.Commit(); err != nil {
return nil, err
}
err := saveState(&app.state)
if err != nil {
return nil, err
}
return &abci.CommitResponse{}, nil
}
// ExtendVote returns curse words as vote extensions.
func (app *ForumApp) ExtendVote(_ context.Context, _ *abci.ExtendVoteRequest) (*abci.ExtendVoteResponse, error) {
app.logger.Info("Executing Application ExtendVote")
return &abci.ExtendVoteResponse{VoteExtension: []byte(app.CurseWords)}, nil
}
// VerifyVoteExtension verifies the vote extensions and ensure they include the curse words
// It will not be called for extensions generated by this validator.
func (app *ForumApp) VerifyVoteExtension(_ context.Context, req *abci.VerifyVoteExtensionRequest) (*abci.VerifyVoteExtensionResponse, error) {
app.logger.Info("Executing Application VerifyVoteExtension")
if _, ok := app.valAddrToPubKeyMap[string(req.ValidatorAddress)]; !ok {
// we do not have a validator with this address mapped; this should never happen
return nil, errors.New("unknown validator")
}
curseWords := strings.Split(string(req.VoteExtension), "|")
if hasDuplicateWords(curseWords) {
return &abci.VerifyVoteExtensionResponse{Status: abci.VERIFY_VOTE_EXTENSION_STATUS_REJECT}, nil
}
// ensure vote extension curse words limit has not been exceeded
if len(curseWords) > CurseWordsLimitVE {
return &abci.VerifyVoteExtensionResponse{Status: abci.VERIFY_VOTE_EXTENSION_STATUS_REJECT}, nil
}
return &abci.VerifyVoteExtensionResponse{Status: abci.VERIFY_VOTE_EXTENSION_STATUS_ACCEPT}, nil
}
// getWordsFromVE gets the curse words from the vote extensions as one string, the words are concatenated using a '|'
// this method also ensures there are no duplicate curse words in the final set returned.
func (app *ForumApp) getWordsFromVe(voteExtensions []abci.ExtendedVoteInfo) string {
curseWordMap := make(map[string]int)
for _, vote := range voteExtensions {
// This code gets the curse words and makes sure that we do not add them more than once
// Thus ensuring each validator only adds one word once
curseWords := strings.Split(string(vote.GetVoteExtension()), "|")
for _, word := range curseWords {
if count, ok := curseWordMap[word]; !ok {
curseWordMap[word] = 1
} else {
curseWordMap[word] = count + 1
}
}
}
app.logger.Info("Processed vote extensions", "curse_words", curseWordMap)
majority := len(app.valAddrToPubKeyMap) / 3 // We define the majority to be at least 1/3 of the validators;
voteExtensionCurseWords := ""
for word, count := range curseWordMap {
if count > majority {
if voteExtensionCurseWords == "" {
voteExtensionCurseWords = word
} else {
voteExtensionCurseWords += "|" + word
}
}
}
return voteExtensionCurseWords
}
// hasDuplicateWords detects if there are duplicate words in the slice.
func hasDuplicateWords(words []string) bool {
wordMap := make(map[string]struct{})
for _, word := range words {
wordMap[word] = struct{}{}
}
return len(words) != len(wordMap)
}
*Explanation of code:
The state of the app is stored in an AppState
struct which contains the current height, hash and a BadgerDB
instance.
The InitChain
function initializes the validators from the CometBFT response and loads the initial state.
For explanation of other functions like PrepareProposal
, ProcessProposal
, FinalizeBlock
, VoteExtension
and
VerifyVoteExtensions
please refer to previous sections.
In the next session, you will learn how to run the application