Derek's blog


  • 首页

  • 关于

  • 归档

  • 标签

Derek解读-Bytom源码分析-持久化存储LevelDB(二)-cache缓存

发表于 2018-08-23

Derek解读-Bytom源码分析-持久化存储LevelDB(二)-cache缓存

简介

https://github.com/Bytom/bytom

本章介绍Derek解读-Bytom源码分析-持久化存储LevelDB(二)-cache缓存

作者使用MacOS操作系统,其他平台也大同小异

Golang Version: 1.8

block cache缓存介绍

上一篇《Derek解读-Bytom源码分析-持久化存储LevelDB》文章介绍了比原链数据持久化到磁盘。当执行读取数据时从cache中得到,实现快速访问。

比原链的block cache主要实现有两个部分:

  • lru: 缓存淘汰算法
  • fillFn: 回调函数

比原链的block cache实现过程如下:

  1. 执行GetBlock函数
  2. 从block cache中查询,如果命中缓存则返回
  3. block cache中未命中则执行fillFn回调函数从磁盘中获取并缓存block cache中,然后返回

blockCache分析

blockCache结构

database/leveldb/cache.go

1
2
3
4
5
6
type blockCache struct {
mu sync.Mutex
lru *lru.Cache
fillFn func(hash *bc.Hash) *types.Block
single singleflight.Group
}
  • mu: 互斥锁,保证共享数据操作的一致性
  • lru: 缓存淘汰算法
  • fillFn: 回调函数
  • single: 回调函数的调用机制,同一时间保证只有一个回调函数在执行。

缓存命中和不命中的操作

database/leveldb/cache.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (c *blockCache) lookup(hash *bc.Hash) (*types.Block, error) {
if b, ok := c.get(hash); ok {
return b, nil
}

block, err := c.single.Do(hash.String(), func() (interface{}, error) {
b := c.fillFn(hash)
if b == nil {
return nil, fmt.Errorf("There are no block with given hash %s", hash.String())
}

c.add(b)
return b, nil
})
if err != nil {
return nil, err
}
return block.(*types.Block), nil
}

func (c *blockCache) get(hash *bc.Hash) (*types.Block, bool) {
c.mu.Lock()
block, ok := c.lru.Get(*hash)
c.mu.Unlock()
if block == nil {
return nil, ok
}
return block.(*types.Block), ok
}

lookup函数从block cache中查询,如果命中缓存则返回。如果block cache未命中则single.Do执行fillFn回调函数从磁盘中获取并缓存block cache中,然后返回

LRU缓存淘汰算法

LRU(Least recently used)算法根据数据的历史访问记录来进行淘汰数据,如果数据最近被访问过,那么缓存被命中的几率也更高。

logo

  1. 新的数据插入到链表头部
  2. 每当缓存命中,则将数据移到链表头部
  3. 当链表满的时候,将链表尾部的数据丢弃

LRU算法实现

LRU一般使用hash map和doubly linked list双向链表实现

实例化LRU CACHE

vendor/github.com/golang/groupcache/lru/lru.go

1
2
3
4
5
6
7
func New(maxEntries int) *Cache {
return &Cache{
MaxEntries: maxEntries,
ll: list.New(),
cache: make(map[interface{}]*list.Element),
}
}

  • MaxEntries:最大缓存条目数,如果为0则表示无限制
  • ll: 双向链表数据结构

LRU查询元素

1
2
3
4
5
6
7
8
9
10
func (c *Cache) Get(key Key) (value interface{}, ok bool) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.ll.MoveToFront(ele)
return ele.Value.(*entry).value, true
}
return
}

Get获取元素,如果元素命中则将元素移至链表头部,保持最新的访问

LRU添加元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (c *Cache) Add(key Key, value interface{}) {
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
}
if ee, ok := c.cache[key]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
}
}

Add添加元素有三步操作:

  1. 如果当前缓存中存在该元素则将元素移至链表头并返回
  2. 如果缓存中不存在该元素则将元素插入链表头部
  3. 如果缓存的元素超过MaxEntries限制则移除链表最末尾的元素

Derek解读-Bytom源码分析-持久化存储LevelDB

发表于 2018-08-22

Derek解读-Bytom源码分析-持久化存储LevelDB

简介

https://github.com/Bytom/bytom

本章介绍Derek解读-Bytom源码分析-持久化存储LevelDB

作者使用MacOS操作系统,其他平台也大同小异

Golang Version: 1.8

LevelDB介绍

比原链默认使用leveldb数据库。Leveldb是一个google实现的非常高效的kv数据库。LevelDB是单进程的服务,性能非常之高,在一台4核Q6600的CPU机器上,每秒钟写数据超过40w,而随机读的性能每秒钟超过10w。
由于Leveldb是单进程服务,不能同时有多个进程进行对一个数据库进行读写。同一时间只能有一个进程,或一个进程多并发的方式进行读写。
比原链在数据存储层上存储所有链上地址、资产交易等信息。

LevelDB的增删改查操作

LevelDB是google开发的一个高性能K/V存储,本节我们介绍下LevelDB如何对LevelDB增删改查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"

dbm "github.com/tendermint/tmlibs/db"
)

var (
Key = "TESTKEY"
LevelDBDir = "/tmp/data"
)

func main() {
db := dbm.NewDB("test", "leveldb", LevelDBDir)
defer db.Close()

db.Set([]byte(Key), []byte("This is a test."))

value := db.Get([]byte(Key))
if value == nil {
return
}
fmt.Printf("key:%v, value:%v\n", Key, string(value))

db.Delete([]byte(Key))
}

// Output
// key:TESTKEY, value:This is a test.

以上Output是执行该程序得到的输出结果。

该程序对leveld进行了增删改查操作。dbm.NewDB得到db对象,在/tmp/data目录下会生成一个叫test.db的目录。该目录存放该数据库的所有数据。
db.Set 设置key的value值,key不存在则新建,key存在则修改。
db.Get 得到key中value数据。
db.Delete 删除key及value的数据。

比原链的数据库

默认情况下,数据存储目录在–home参数下的data目录。以Darwin平台为例,默认数据库存储在 $HOME/Library/Bytom/data。

  • accesstoken.db token信息(钱包访问控制权限)
  • core.db 核心数据库,存储主链相关数据。包括块信息、交易信息、资产信息等
  • discover.db 分布式网络中端到端的节点信息
  • trusthistory.db
  • txdb.db 存储交易相关信息
  • txfeeds.db 目前比原链代码版本未使用该功能,暂不介绍
  • wallet.db 本地钱包数据库。存储用户、资产、交易、utox等信息

以上所有数据库都由database模块管理

比原数据库接口

在比原链中数据持久化存储由database模块管理,但是持久化相关接口在protocol/store.go中

1
2
3
4
5
6
7
8
9
10
11
12
13
type Store interface {
BlockExist(*bc.Hash) bool

GetBlock(*bc.Hash) (*types.Block, error)
GetStoreStatus() *BlockStoreState
GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
GetTransactionsUtxo(*state.UtxoViewpoint, []*bc.Tx) error
GetUtxo(*bc.Hash) (*storage.UtxoEntry, error)

LoadBlockIndex() (*state.BlockIndex, error)
SaveBlock(*types.Block, *bc.TransactionStatus) error
SaveChainStatus(*state.BlockNode, *state.UtxoViewpoint) error
}
  • BlockExist 根据hash判断区块是否存在
  • GetBlock 根据hash获取该区块
  • GetStoreStatus 获取store的存储状态
  • GetTransactionStatus 根据hash获取该块中所有交易的状态
  • GetTransactionsUtxo 缓存与输入txs相关的所有utxo
  • GetUtxo(*bc.Hash) 根据hash获取该块内的所有utxo
  • LoadBlockIndex 加载块索引,从db中读取所有block header信息并缓存在内存中
  • SaveBlock 存储块和交易状态
  • SaveChainStatus 设置主链的状态,当节点第一次启动时,节点会根据key为blockStore的内容判断是否初始化主链。

比原链数据库key前缀

database/leveldb/store.go

1
2
3
4
5
6
var (
blockStoreKey = []byte("blockStore")
blockPrefix = []byte("B:")
blockHeaderPrefix = []byte("BH:")
txStatusPrefix = []byte("BTS:")
)
  • blockStoreKey 主链状态前缀
  • blockPrefix 块信息前缀
  • blockHeaderPrefix 块头信息前缀
  • txStatusPrefix 交易状态前缀

GetBlock查询块过程分析

database/leveldb/store.go

1
2
3
func (s *Store) GetBlock(hash *bc.Hash) (*types.Block, error) {
return s.cache.lookup(hash)
}

database/leveldb/cache.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *blockCache) lookup(hash *bc.Hash) (*types.Block, error) {
if b, ok := c.get(hash); ok {
return b, nil
}

block, err := c.single.Do(hash.String(), func() (interface{}, error) {
b := c.fillFn(hash)
if b == nil {
return nil, fmt.Errorf("There are no block with given hash %s", hash.String())
}

c.add(b)
return b, nil
})
if err != nil {
return nil, err
}
return block.(*types.Block), nil
}

GetBlock函数最终会执行lookup函数。lookup函数总共操作有两步:

  • 从缓存中查询hash值,如果查到则返回
  • 如果为从缓存中查询到则回调fillFn回调函数。fillFn回调函数会将从磁盘上获得到块信息存储到缓存中并返回该块的信息。

fillFn回调函数实际上调取的是database/leveldb/store.go下的GetBlock,它会从磁盘中获取block信息并返回。

Derek解读-Bytom源码分析-创世区块

发表于 2018-08-18

Derek解读-Bytom源码分析-创世区块

简介

https://github.com/Bytom/bytom

本章介绍Derek解读-Bytom源码分析-创世区块

作者使用MacOS操作系统,其他平台也大同小异

Golang Version: 1.8

创世区块介绍

区块链里的第一个区块创被称为创世区块。它是区块链里面所有区块的共同祖先。

在比原链中创世区块被硬编码到bytomd中,每一个比原节点都始于同一个创世区块,这能确保创世区块不会被改变。每个节点都把创世区块作为区块链的首区块,从而构建了一个安全的、可信的区块链。

获取创世区块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
./bytomcli get-block 0
{
"bits": 2161727821137910500,
"difficulty": "15154807",
"hash": "a75483474799ea1aa6bb910a1a5025b4372bf20bef20f246a2c2dc5e12e8a053",
"height": 0,
"nonce": 9253507043297,
"previous_block_hash": "0000000000000000000000000000000000000000000000000000000000000000",
"size": 546,
"timestamp": 1524549600,
"transaction_merkle_root": "58e45ceb675a0b3d7ad3ab9d4288048789de8194e9766b26d8f42fdb624d4390",
"transaction_status_hash": "c9c377e5192668bc0a367e4a4764f11e7c725ecced1d7b6a492974fab1b6d5bc",
"transactions": [
{
"id": "158d7d7c6a8d2464725d508fafca76f0838d998eacaacb42ccc58cfb0c155352",
"inputs": [
{
"amount": 0,
"arbitrary": "496e666f726d6174696f6e20697320706f7765722e202d2d204a616e2f31312f323031332e20436f6d707574696e6720697320706f7765722e202d2d204170722f32342f323031382e",
"asset_definition": {},
"asset_id": "0000000000000000000000000000000000000000000000000000000000000000",
"type": "coinbase"
}
],
"outputs": [
{
"address": "bm1q3jwsv0lhfmndnlag3kp6avpcq6pkd3xy8e5r88",
"amount": 140700041250000000,
"asset_definition": {},
"asset_id": "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
"control_program": "00148c9d063ff74ee6d9ffa88d83aeb038068366c4c4",
"id": "e3325bf07c4385af4b60ad6ecc682ee0773f9b96e1cfbbae9f0f12b86b5f1093",
"position": 0,
"type": "control"
}
],
"size": 151,
"status_fail": false,
"time_range": 0,
"version": 1
}
],
"version": 1
}

使用bytomcli客户端查询高度为0的区块信息。我们可以看到以上输出结果。

  • bits: 目标值,挖矿时计算的hash之后要小于等于的目标值则新块构建成功
  • difficulty: 难度值,矿工找到下一个有效区块的难度。该参数并不存储在区块链上,是由bits计算得出
  • hash: 当前区块hash
  • height: 当前区块高度
  • nonce: 随机数,挖矿时反复使用不同的nonce来生成不同哈希值
  • previous_block_hash: 当前区块的父区块hash值
  • size: 当前区块的字节数
  • timestamp: 出块时间
  • transaction_merkle_root: 创世区块的merkle树根节点
  • transactions: 当前块中的utxo交易

由于创世区块是第一个块,创世区块的父区块,也就是previous_block_hash参数,默认情况下为0000000000000000000000000000000000000000000000000000000000000000

时间戳timestamp为1524549600,时间为2018-04-24 14:00:00也就是比原链上主网的时间。

源码分析

获取区块链状态

protocol/protocol.go

1
2
3
4
5
6
7
8
9
10
11
func NewChain(store Store, txPool *TxPool) (*Chain, error) {
// ...
storeStatus := store.GetStoreStatus()
if storeStatus == nil {
if err := c.initChainStatus(); err != nil {
return nil, err
}
storeStatus = store.GetStoreStatus()
}
// ...
}

当我们第一次启动比原链节点时,store.GetStoreStatus会从db中获取存储状态,获取存储状态的过程是从LevelDB中查询key为blockStore的数据,如果查询出错则认为是第一次运行比原链节点,那么就需要初始化比原主链。

初始化主链

protocol/protocol.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (c *Chain) initChainStatus() error {
genesisBlock := config.GenesisBlock()
txStatus := bc.NewTransactionStatus()
for i := range genesisBlock.Transactions {
txStatus.SetStatus(i, false)
}

if err := c.store.SaveBlock(genesisBlock, txStatus); err != nil {
return err
}

utxoView := state.NewUtxoViewpoint()
bcBlock := types.MapBlock(genesisBlock)
if err := utxoView.ApplyBlock(bcBlock, txStatus); err != nil {
return err
}

node, err := state.NewBlockNode(&genesisBlock.BlockHeader, nil)
if err != nil {
return err
}
return c.store.SaveChainStatus(node, utxoView)
}

初始化主链有几步操作:

  • config.GenesisBlock()获取创世区块
  • 设置创世区块中所有交易状态
  • 存储创世区块到LevelDB
  • state.NewUtxoViewpoint()用于临时小部分utxo状态存储集合
  • 实例化BlockNode,BlockNode用于选择最佳链作为主链
  • 保存最新主链状态

被硬编码的创世区块

config/genesis.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func genesisTx() *types.Tx {
contract, err := hex.DecodeString("00148c9d063ff74ee6d9ffa88d83aeb038068366c4c4")
if err != nil {
log.Panicf("fail on decode genesis tx output control program")
}

txData := types.TxData{
Version: 1,
Inputs: []*types.TxInput{
types.NewCoinbaseInput([]byte("Information is power. -- Jan/11/2013. Computing is power. -- Apr/24/2018.")),
},
Outputs: []*types.TxOutput{
types.NewTxOutput(*consensus.BTMAssetID, consensus.InitialBlockSubsidy, contract),
},
}
return types.NewTx(txData)
}

func mainNetGenesisBlock() *types.Block {
tx := genesisTx()
txStatus := bc.NewTransactionStatus()
txStatus.SetStatus(0, false)
txStatusHash, err := bc.TxStatusMerkleRoot(txStatus.VerifyStatus)
if err != nil {
log.Panicf("fail on calc genesis tx status merkle root")
}

merkleRoot, err := bc.TxMerkleRoot([]*bc.Tx{tx.Tx})
if err != nil {
log.Panicf("fail on calc genesis tx merkel root")
}

block := &types.Block{
BlockHeader: types.BlockHeader{
Version: 1,
Height: 0,
Nonce: 9253507043297,
Timestamp: 1524549600,
Bits: 2161727821137910632,
BlockCommitment: types.BlockCommitment{
TransactionsMerkleRoot: merkleRoot,
TransactionStatusHash: txStatusHash,
},
},
Transactions: []*types.Tx{tx},
}
return block
}

mainNetGenesisBlock主要有如下操作:

  • 生成创世区块中的交易,默认就一笔交易
  • 设置块中的交易状态为false
  • 将创世区块设置为merkle树的根节点
  • 实例化Block块并返回

genesisTx函数生成创世区块中的交易,默认就一笔交易,一笔交易中包含input输入和output输出。

input输入:
输入中有一句话”Information is power. – Jan/11/2013. Computing is power. – Apr/24/2018.”这是为了纪念Aaron Swartz的精神

output输出:
输出中我们看到consensus.InitialBlockSubsidy创世区块的奖励。总共140700041250000000/1e8 = 1407000412。也就是14亿个BTM币。

计算即权力

引用比原链创始人长铗的话:

4月24号,我们主网上线,信息即权力,2013年Jaruary11;计算即权力,2018年April24。这句话是为了纪念Aaron Swartz的精神,信息即权力可以视为互联网宣言,致力于信息自由传播,让公民隐私得到保护。计算即权力,致力于让资产自由的交易,自由的流动,让公民的财富得到保护,我觉得这是非常好的纪念。

bytom源码分析-protobuf生成核心代码

发表于 2018-08-17

bytom源码分析-protobuf生成比原核心代码

简介

https://github.com/Bytom/bytom

本章介绍bytom代码Api-Server接口服务

作者使用MacOS操作系统,其他平台也大同小异

Golang Version: 1.8

protobuf生成比原核心代码

protobuf介绍

Protocol buffers是一个灵活的、高效的、自动化的用于对结构化数据进行序列化的协议。Protocol buffers序列化后的码流更小、速度更快、操作更简单。只需要将序列化的数据结构(.proto文件),便可以生成的源代码。

protobuf 3.0语法介绍

protobuf 语法

protobuf 安装

安装protobuf 3.4.0

protobuf download

1
2
3
4
./configure
make
make install
protoc —version

安装grpc-go

1
2
3
export PATH=$PATH:$GOPATH/bin
go get -u google.golang.org/grpc
go get -u github.com/golang/protobuf/protoc-gen-go

查看比原bc.proto核心文件

protocol/bc/bc.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
syntax = "proto3";

package bc;

message Hash {
fixed64 v0 = 1;
fixed64 v1 = 2;
fixed64 v2 = 3;
fixed64 v3 = 4;
}

message Program {
uint64 vm_version = 1;
bytes code = 2;
}

// This message type duplicates Hash, above. One alternative is to
// embed a Hash inside an AssetID. But it's useful for AssetID to be
// plain old data (without pointers). Another alternative is use Hash
// in any protobuf types where an AssetID is called for, but it's

// preferable to have type safety.
message AssetID {
fixed64 v0 = 1;
fixed64 v1 = 2;
fixed64 v2 = 3;
fixed64 v3 = 4;
}

message AssetAmount {
AssetID asset_id = 1;
uint64 amount = 2;
}

message AssetDefinition {
Program issuance_program = 1;
Hash data = 2;
}

message ValueSource {
Hash ref = 1;
AssetAmount value = 2;
uint64 position = 3;
}

message ValueDestination {
Hash ref = 1;
AssetAmount value = 2;
uint64 position = 3;
}

message BlockHeader {
uint64 version = 1;
uint64 height = 2;
Hash previous_block_id = 3;
uint64 timestamp = 4;
Hash transactions_root = 5;
Hash transaction_status_hash = 6;
uint64 nonce = 7;
uint64 bits = 8;
TransactionStatus transaction_status = 9;
}

message TxHeader {
uint64 version = 1;
uint64 serialized_size = 2;
uint64 time_range = 3;
repeated Hash result_ids = 4;
}

message TxVerifyResult {
bool status_fail = 1;
}

message TransactionStatus {
uint64 version = 1;
repeated TxVerifyResult verify_status = 2;
}

message Mux {
repeated ValueSource sources = 1; // issuances, spends, and muxes
Program program = 2;
repeated ValueDestination witness_destinations = 3; // outputs, retirements, and muxes
repeated bytes witness_arguments = 4;
}

message Coinbase {
ValueDestination witness_destination = 1;
bytes arbitrary = 2;
}

message Output {
ValueSource source = 1;
Program control_program = 2;
uint64 ordinal = 3;
}

message Retirement {
ValueSource source = 1;
uint64 ordinal = 2;
}

message Issuance {
Hash nonce_hash = 1;
AssetAmount value = 2;
ValueDestination witness_destination = 3;
AssetDefinition witness_asset_definition = 4;
repeated bytes witness_arguments = 5;
uint64 ordinal = 6;
}

message Spend {
Hash spent_output_id = 1;
ValueDestination witness_destination = 2;
repeated bytes witness_arguments = 3;
uint64 ordinal = 4;
}

根据bc.proto生成bc.pb.go代码

1
2
3
4
protoc -I/usr/local/include -I. \
-I${GOPATH}/src \
--go_out=plugins=grpc:. \
./*.proto

执行完上面命令,我们会看到当前目录下生成的bc.pb.go文件,该文件在比原链中承载这block、transaction、coinbase等重要数据结构

bytom源码分析-Api-Server接口服务

发表于 2018-08-16

bytom源码分析-Api-Server接口服务

简介

https://github.com/Bytom/bytom

本章介绍bytom代码Api-Server接口服务

作者使用MacOS操作系统,其他平台也大同小异

Golang Version: 1.8

Api-Server接口服务

Api Server是比原链中非常重要的一个功能,在比原链的架构中专门服务于bytomcli和dashboard,他的功能是接收并处理用户和矿池相关的请求。默认启动9888端口。总之主要功能如下:

  • 接收并处理用户或矿池发送的请求
  • 管理交易:打包、签名、提交等操作
  • 管理本地比原钱包
  • 管理本地p2p节点信息
  • 管理本地矿工挖矿操作等

在Api Server服务过程中,在监听地址listener上接收bytomcli或dashboard的请求访问。对每一个请求,Api Server均会创建一个新的goroutine来处理请求。首先Api Server读取请求内容,解析请求,接着匹配相应的路由项,随后调用路由项的Handler回调函数来处理。最后Handler处理完请求之后给bytomcli响应该请求。

Api-Server源码分析

在bytomd启动过程中,bytomd使用golang标准库http.NewServeMux()创建一个router路由器,提供请求的路由分发功能。创建Api Server主要有三部分组成:

  • 初始化http.NewServeMux()得到mux
  • 为mux.Handle添加多个有效的router路由项。每一个路由项由HTTP请求方法(GET、POST、PUT、DELET)、URL和Handler回调函数组成
  • 将监听地址作为参数,最终执行Serve(listener)开始服务于外部请求

创建Api对象

node/node.go

1
2
3
4
5
6
7
func (n *Node) initAndstartApiServer() {
n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)

listenAddr := env.String("LISTEN", n.config.ApiAddress)
env.Parse()
n.api.StartServer(*listenAddr)
}

api/api.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, cpuMiner *cpuminer.CPUMiner, miningPool *miningpool.MiningPool, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore) *API {
api := &API{
sync: sync,
wallet: wallet,
chain: chain,
accessTokens: token,
txFeedTracker: txfeeds,
cpuMiner: cpuMiner,
miningPool: miningPool,
}
api.buildHandler()
api.initServer(config)

return api
}

首先,实例化api对象。Api-server管理的事情很多,所以参数也相对较多。
listenAddr本地端口,如果系统没有设置LISTEN变量则使用config.ApiAddress配置地址,默认为9888

NewAPI函数我们看到有三个操作:

  1. 实例化api对象
  2. api.buildHandler添加router路由项
  3. api.initServer实例化http.Server,配置auth验证等

router路由项

1
2
3
4
5
6
7
8
9
10
11
12
func (a *API) buildHandler() {
walletEnable := false
m := http.NewServeMux()
if a.wallet != nil {
walletEnable = true

m.Handle("/create-account", jsonHandler(a.createAccount))
m.Handle("/list-accounts", jsonHandler(a.listAccounts))
m.Handle("/delete-account", jsonHandler(a.deleteAccount))
// ...
}
}

router路由项过多。这里只介绍关于账号相关的handler。其他的handler大同小异。

1
m.Handle("/create-account", jsonHandler(a.createAccount))

我们可以看到一条router项由url和对应的handle回调函数组成。当我们请求的url匹配到/create-account时,Api-Server会执行a.createAccount函数,并将用户的传参也带过去。

启动Api-Server服务

api/api.go

1
2
3
4
5
6
7
8
9
10
11
12
13
func (a *API) StartServer(address string) {
log.WithField("api address:", address).Info("Rpc listen")
listener, err := net.Listen("tcp", address)
if err != nil {
cmn.Exit(cmn.Fmt("Failed to register tcp port: %v", err))
}

go func() {
if err := a.server.Serve(listener); err != nil {
log.WithField("error", errors.Wrap(err, "Serve")).Error("Rpc server")
}
}()
}

通过golang标准库net.listen方法,监听本地的地址端口。由于http服务是一个持久运行的服务,我们启动一个go程专门运行http服务。当运行a.server.Serve没有任何报错时,我们可以看到服务器上启动的9888端口。此时Api-Server已经处于等待接收用户的请求。

bytom源码分析-孤块管理

发表于 2018-08-16

bytom源码分析-孤块管理

简介

https://github.com/Bytom/bytom

本章介绍bytom代码孤块管理

作者使用MacOS操作系统,其他平台也大同小异

Golang Version: 1.8

孤块介绍

什么是孤块

当节点收到了一个有效的区块,而在现有的主链中却未找到它的父区块,那么这个区块被认为是“孤块”。父区块是指当前区块的PreviousBlockHash字段指向上一区块的hash值。

接收到的孤块会被存储在孤块池中,直到它们的父区块被节点收到。一旦收到了父区块,节点就会将孤块从孤块池中取出,并且连接到它的父区块,让它作为区块链的一部分。

孤块出现的原因

当两个或多个区块在很短的时间间隔内被挖出来,节点有可能会以不同的顺序接收到它们,这个时候孤块现象就会出现。

我们假设有三个高度分别为100、101、102的块,分别以102、101、100的颠倒顺序被节点接收。此时节点将102、101放入到孤块管理缓存池中,等待彼此的父块。当高度为100的区块被同步进来时,会被验证区块和交易,然后存储到区块链上。这时会对孤块缓存池进行递归查询,根据高度为100的区块找到101的区块并存储到区块链上,再根据高度为101的区块找到102的区块并存储到区块链上。

孤块源码分析

孤块管理缓存池结构体

protocol/orphan_manage.go

1
2
3
4
5
6
7
8
9
10
11
12
type OrphanManage struct {
orphan map[bc.Hash]*types.Block
prevOrphans map[bc.Hash][]*bc.Hash
mtx sync.RWMutex
}

func NewOrphanManage() *OrphanManage {
return &OrphanManage{
orphan: make(map[bc.Hash]*types.Block),
prevOrphans: make(map[bc.Hash][]*bc.Hash),
}
}
  • orphan 存储孤块,key为block hash,value为block结构体
  • prevOrphans 存储孤块的父块
  • mtx 互斥锁,保护map结构在多并发读写状态下保持数据一致

添加孤块到缓存池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (o *OrphanManage) Add(block *types.Block) {
blockHash := block.Hash()
o.mtx.Lock()
defer o.mtx.Unlock()

if _, ok := o.orphan[blockHash]; ok {
return
}

o.orphan[blockHash] = block
o.prevOrphans[block.PreviousBlockHash] = append(o.prevOrphans[block.PreviousBlockHash], &blockHash)

log.WithFields(log.Fields{"hash": blockHash.String(), "height": block.Height}).Info("add block to orphan")
}

当一个孤块被添加到缓存池中,还需要记录该孤块的父块hash。用于父块hash的查询

查询孤块和父孤块

1
2
3
4
5
6
7
8
9
10
11
12
13
func (o *OrphanManage) Get(hash *bc.Hash) (*types.Block, bool) {
o.mtx.RLock()
block, ok := o.orphan[*hash]
o.mtx.RUnlock()
return block, ok
}

func (o *OrphanManage) GetPrevOrphans(hash *bc.Hash) ([]*bc.Hash, bool) {
o.mtx.RLock()
prevOrphans, ok := o.prevOrphans[*hash]
o.mtx.RUnlock()
return prevOrphans, ok
}

删除孤块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (o *OrphanManage) Delete(hash *bc.Hash) {
o.mtx.Lock()
defer o.mtx.Unlock()
block, ok := o.orphan[*hash]
if !ok {
return
}
delete(o.orphan, *hash)

prevOrphans, ok := o.prevOrphans[block.PreviousBlockHash]
if !ok || len(prevOrphans) == 1 {
delete(o.prevOrphans, block.PreviousBlockHash)
return
}

for i, preOrphan := range prevOrphans {
if preOrphan == hash {
o.prevOrphans[block.PreviousBlockHash] = append(prevOrphans[:i], prevOrphans[i+1:]...)
return
}
}
}

删除孤块的过程中,同时删除父块

孤块处理逻辑

protocol/block.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *Chain) processBlock(block *types.Block) (bool, error) {
blockHash := block.Hash()
if c.BlockExist(&blockHash) {
log.WithFields(log.Fields{"hash": blockHash.String(), "height": block.Height}).Info("block has been processed")
return c.orphanManage.BlockExist(&blockHash), nil
}

if parent := c.index.GetNode(&block.PreviousBlockHash); parent == nil {
c.orphanManage.Add(block)
return true, nil
}

if err := c.saveBlock(block); err != nil {
return false, err
}

bestBlock := c.saveSubBlock(block)
// ...
}

processBlock函数处理block块加入区块链上之前的过程。

c.BlockExist判断当前block块是否存在于区块链上或是否存在孤块缓存池中,如果存在则返回。

c.index.GetNode判断block块的父节点是否存在。如果在现有的主链中却未找到它的父区块则将block块添加到孤块缓存池。

c.saveBlock走到了这一步说明,block父节点是存在于区块链,则将block块存储到区块链。该函数会验证区块和交易有效性。

saveSubBlock 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

func (c *Chain) saveSubBlock(block *types.Block) *types.Block {
blockHash := block.Hash()
prevOrphans, ok := c.orphanManage.GetPrevOrphans(&blockHash)
if !ok {
return block
}

bestBlock := block
for _, prevOrphan := range prevOrphans {
orphanBlock, ok := c.orphanManage.Get(prevOrphan)
if !ok {
log.WithFields(log.Fields{"hash": prevOrphan.String()}).Warning("saveSubBlock fail to get block from orphanManage")
continue
}
if err := c.saveBlock(orphanBlock); err != nil {
log.WithFields(log.Fields{"hash": prevOrphan.String(), "height": orphanBlock.Height}).Warning("saveSubBlock fail to save block")
continue
}

if subBestBlock := c.saveSubBlock(orphanBlock); subBestBlock.Height > bestBlock.Height {
bestBlock = subBestBlock
}
}
return bestBlock
}

saveSubBlock 在孤块缓存池中查询是否存在当前区块的下一个区块。比如当前区块高度为100,则在孤块缓存池中查询是否有区块高度为101的区块。如果存在则将101区块存储到区块链并从孤块缓存池中删除该区块。

saveSubBlock是一个递归函数的实现。目的是为了寻找最深叶子节点的递归方式。比如当前区块高度为100的,递归查询出高度为99、98、97等高度的区块。

bytom源码分析-P2P网络-地址簿

发表于 2018-04-28

bytom源码分析-P2P网络-地址簿

简介

https://github.com/Bytom/bytom

本章介绍bytom代码P2P网络中addrbook地址簿

作者使用MacOS操作系统,其他平台也大同小异

Golang Version: 1.8

addrbook介绍

addrbook用于存储P2P网络中保留最近的对端节点地址
在MacOS下,默认的地址簿路径存储在~/Library/Bytom/addrbook.json

地址簿格式

~/Library/Bytom/addrbook.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"Key": "359be6d08bc0c6e21c84bbb2",
"Addrs": [
{
"Addr": {
"IP": "122.224.11.144",
"Port": 46657
},
"Src": {
"IP": "198.74.61.131",
"Port": 46657
},
"Attempts": 0,
"LastAttempt": "2018-05-04T12:58:23.894057702+08:00",
"LastSuccess": "0001-01-01T00:00:00Z",
"BucketType": 1,
"Buckets": [
181,
10
]
}
]
}

地址类型

在addrbook中存储的地址有两种:
p2p/addrbook.go

1
2
3
4
const (
bucketTypeNew = 0x01 // 标识新地址,不可靠地址(未成功连接过)。只存储在一个bucket中
bucketTypeOld = 0x02 // 标识旧地址,可靠地址(已成功连接过)。可以存储在多个bucket中,最多为maxNewBucketsPerAddress个
)

注意: 一个地址的类型变更不在此文章中做介绍,后期的文章会讨论该问题

地址簿相关结构体

地址簿

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type AddrBook struct {
cmn.BaseService

mtx sync.Mutex
filePath string // 地址簿路径
routabilityStrict bool // 是否可路由,默认为true
rand *rand.Rand
key string // 地址簿标识,用于计算addrNew和addrOld的索引
ourAddrs map[string]*NetAddress // 存储本地网络地址,用于添加p2p地址时做排除使用
addrLookup map[string]*knownAddress // 存储新、旧地址集,用于查询
addrNew []map[string]*knownAddress // 存储新地址
addrOld []map[string]*knownAddress // 存储旧地址
wg sync.WaitGroup
nOld int // 旧地址数量
nNew int // 新地址数量
}

已知地址

1
2
3
4
5
6
7
8
9
type knownAddress struct {
Addr *NetAddress // 已知peer的addr
Src *NetAddress // 已知peer的addr的来源addr
Attempts int32 // 连接peer的重试次数
LastAttempt time.Time // 最近一次尝试连接的时间
LastSuccess time.Time // 最近一次尝试成功连接的时间
BucketType byte // 地址的类型(表示可靠地址或不可靠地址)
Buckets []int // 当前addr所属的buckets
}

routabilityStrict参数表示地址簿是否存储的ip是否可路由。可路由是根据RFC划分,具体参考资料:RFC标准

初始化地址簿

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// NewAddrBook creates a new address book.
// Use Start to begin processing asynchronous address updates.
func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
am := &AddrBook{
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
ourAddrs: make(map[string]*NetAddress),
addrLookup: make(map[string]*knownAddress),
filePath: filePath,
routabilityStrict: routabilityStrict,
}
am.init()
am.BaseService = *cmn.NewBaseService(nil, "AddrBook", am)
return am
}

// When modifying this, don't forget to update loadFromFile()
func (a *AddrBook) init() {
// 地址簿唯一标识
a.key = crypto.CRandHex(24) // 24/2 * 8 = 96 bits
// New addr buckets, 默认为256个大小
a.addrNew = make([]map[string]*knownAddress, newBucketCount)
for i := range a.addrNew {
a.addrNew[i] = make(map[string]*knownAddress)
}
// Old addr buckets,默认为64个大小
a.addrOld = make([]map[string]*knownAddress, oldBucketCount)
for i := range a.addrOld {
a.addrOld[i] = make(map[string]*knownAddress)
}
}

bytomd启动时加载本地地址簿

loadFromFile在bytomd启动时,首先会加载本地的地址簿

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// OnStart implements Service.
func (a *AddrBook) OnStart() error {
a.BaseService.OnStart()
a.loadFromFile(a.filePath)
a.wg.Add(1)
go a.saveRoutine()
return nil
}

// Returns false if file does not exist.
// cmn.Panics if file is corrupt.
func (a *AddrBook) loadFromFile(filePath string) bool {
// If doesn't exist, do nothing.
// 如果本地地址簿不存在则直接返回
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
return false
}

// 加载地址簿json内容
// Load addrBookJSON{}
r, err := os.Open(filePath)
if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", filePath, err))
}
defer r.Close()
aJSON := &addrBookJSON{}
dec := json.NewDecoder(r)
err = dec.Decode(aJSON)
if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", filePath, err))
}

// 填充addrNew、addrOld等
// Restore all the fields...
// Restore the key
a.key = aJSON.Key
// Restore .addrNew & .addrOld
for _, ka := range aJSON.Addrs {
for _, bucketIndex := range ka.Buckets {
bucket := a.getBucket(ka.BucketType, bucketIndex)
bucket[ka.Addr.String()] = ka
}
a.addrLookup[ka.Addr.String()] = ka
if ka.BucketType == bucketTypeNew {
a.nNew++
} else {
a.nOld++
}
}
return true
}

定时更新地址簿

bytomd会定时更新本地地址簿,默认2分钟一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (a *AddrBook) saveRoutine() {
dumpAddressTicker := time.NewTicker(dumpAddressInterval)
out:
for {
select {
case <-dumpAddressTicker.C:
a.saveToFile(a.filePath)
case <-a.Quit:
break out
}
}
dumpAddressTicker.Stop()
a.saveToFile(a.filePath)
a.wg.Done()
log.Info("Address handler done")
}

func (a *AddrBook) saveToFile(filePath string) {
log.WithField("size", a.Size()).Info("Saving AddrBook to file")

a.mtx.Lock()
defer a.mtx.Unlock()
// Compile Addrs
addrs := []*knownAddress{}
for _, ka := range a.addrLookup {
addrs = append(addrs, ka)
}

aJSON := &addrBookJSON{
Key: a.key,
Addrs: addrs,
}

jsonBytes, err := json.MarshalIndent(aJSON, "", "\t")
if err != nil {
log.WithField("err", err).Error("Failed to save AddrBook to file")
return
}
err = cmn.WriteFileAtomic(filePath, jsonBytes, 0644)
if err != nil {
log.WithFields(log.Fields{
"file": filePath,
"err": err,
}).Error("Failed to save AddrBook to file")
}
}

添加新地址

当peer之间交换addr时,节点会收到对端节点已知的地址信息,这些信息会被当前节点添加到地址簿中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
log.WithFields(log.Fields{
"addr": addr,
"src": src,
}).Debug("Add address to book")
a.addAddress(addr, src)
}


func (a *AddrBook) addAddress(addr, src *NetAddress) {
// 验证地址是否为可路由地址
if a.routabilityStrict && !addr.Routable() {
log.Error(cmn.Fmt("Cannot add non-routable address %v", addr))
return
}
// 验证地址是否为本地节点地址
if _, ok := a.ourAddrs[addr.String()]; ok {
// Ignore our own listener address.
return
}

// 验证地址是否存在地址集中
// 如果存在:则判断该地址是否为old可靠地址、是否超过了最大buckets中。否则根据该地址已经被ka.Buckets引用的个数来随机决定是否添加到地址集中
// 如果不存在:则添加到地址集中。并标识为bucketTypeNew地址类型
ka := a.addrLookup[addr.String()]

if ka != nil {
// Already old.
if ka.isOld() {
return
}
// Already in max new buckets.
if len(ka.Buckets) == maxNewBucketsPerAddress {
return
}
// The more entries we have, the less likely we are to add more.
factor := int32(2 * len(ka.Buckets))
if a.rand.Int31n(factor) != 0 {
return
}
} else {
ka = newKnownAddress(addr, src)
}

// 找到该地址在地址集的索引位置并添加
bucket := a.calcNewBucket(addr, src)
a.addToNewBucket(ka, bucket)

log.Info("Added new address ", "address:", addr, " total:", a.size())
}

选择最优节点

地址簿中存储众多地址,在p2p网络中需选择最优的地址去连接
PickAddress(newBias int)函数中newBias是由pex_reactor产生的地址评分。如何计算地址分数在其他章节中再讲
根据地址评分随机选择地址可增加区块链安全性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// Pick an address to connect to with new/old bias.
func (a *AddrBook) PickAddress(newBias int) *NetAddress {
a.mtx.Lock()
defer a.mtx.Unlock()

if a.size() == 0 {
return nil
}
// newBias地址分数限制在0-100分数之间
if newBias > 100 {
newBias = 100
}
if newBias < 0 {
newBias = 0
}

// Bias between new and old addresses.
oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(newBias))
newCorrelation := math.Sqrt(float64(a.nNew)) * float64(newBias)

// 根据地址分数计算是否从addrOld或addrNew中随机选择一个地址
if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation {
// pick random Old bucket.
var bucket map[string]*knownAddress = nil
num := 0
for len(bucket) == 0 && num < oldBucketCount {
bucket = a.addrOld[a.rand.Intn(len(a.addrOld))]
num++
}
if num == oldBucketCount {
return nil
}
// pick a random ka from bucket.
randIndex := a.rand.Intn(len(bucket))
for _, ka := range bucket {
if randIndex == 0 {
return ka.Addr
}
randIndex--
}
cmn.PanicSanity("Should not happen")
} else {
// pick random New bucket.
var bucket map[string]*knownAddress = nil
num := 0
for len(bucket) == 0 && num < newBucketCount {
bucket = a.addrNew[a.rand.Intn(len(a.addrNew))]
num++
}
if num == newBucketCount {
return nil
}
// pick a random ka from bucket.
randIndex := a.rand.Intn(len(bucket))
for _, ka := range bucket {
if randIndex == 0 {
return ka.Addr
}
randIndex--
}
cmn.PanicSanity("Should not happen")
}
return nil
}

移除一个地址

当一个地址被标记为Bad时则从地址集中移除。目前bytomd的代码版本并未调用过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (a *AddrBook) MarkBad(addr *NetAddress) {
a.RemoveAddress(addr)
}

// RemoveAddress removes the address from the book.
func (a *AddrBook) RemoveAddress(addr *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.addrLookup[addr.String()]
if ka == nil {
return
}
log.WithField("addr", addr).Info("Remove address from book")
a.removeFromAllBuckets(ka)
}

func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
for _, bucketIdx := range ka.Buckets {
bucket := a.getBucket(ka.BucketType, bucketIdx)
delete(bucket, ka.Addr.String())
}
ka.Buckets = nil
if ka.BucketType == bucketTypeNew {
a.nNew--
} else {
a.nOld--
}
delete(a.addrLookup, ka.Addr.String())
}

bytom源码分析-P2P网络-upnp端口映射

发表于 2018-04-28

bytom源码分析-P2P网络-upnp端口映射

简介

https://github.com/Bytom/bytom

本章介绍bytom代码P2P网络中upnp端口映射

作者使用MacOS操作系统,其他平台也大同小异

Golang Version: 1.8

UPNP介绍

UPNP(Universal Plug and Play)通用即插即用。UPNP端口映射将一个外部端口映射到一个内网ip:port。从而实现p2p网络从外网能够穿透网关访问到内网的bytomd节点。

UPNP协议

SSDP(Simple Service Discovery Protocol 简单服务发现协议)
GENA(Generic Event Notification Architecture 通用事件通知结构)
SOAP(Simple Object Access Protocol 简单对象访问协议)
XML(Extensible Markup Language 可扩张标记语言)

UPNP代码

p2p/upnp/upnp.go

发现网络中支持UPNP功能的设备

从网络中发现支持UPNP功能的设备,并得到该设备的location和url等相关信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
type upnpNAT struct {
serviceURL string // 设备的描述文件URL,用于得到该设备的描述信息
ourIP string // 节点本地ip地址
urnDomain string // 设备类型
}

func Discover() (nat NAT, err error) {
ssdp, err := net.ResolveUDPAddr("udp4", "239.255.255.250:1900")
if err != nil {
return
}
conn, err := net.ListenPacket("udp4", ":0")
if err != nil {
return
}
socket := conn.(*net.UDPConn)
defer socket.Close()

err = socket.SetDeadline(time.Now().Add(3 * time.Second))
if err != nil {
return
}

st := "InternetGatewayDevice:1"

// 多播请求:M-SEARCH SSDP协议定义的发现请求。
buf := bytes.NewBufferString(
"M-SEARCH * HTTP/1.1\r\n" +
"HOST: 239.255.255.250:1900\r\n" +
"ST: ssdp:all\r\n" +
"MAN: \"ssdp:discover\"\r\n" +
"MX: 2\r\n\r\n")
message := buf.Bytes()
answerBytes := make([]byte, 1024)
for i := 0; i < 3; i++ {
// 向239.255.255.250:1900发送一条多播请求
_, err = socket.WriteToUDP(message, ssdp)
if err != nil {
return
}
// 如果从网络中发现UPNP设备则会从239.255.255.250:1900收到响应消息
var n int
n, _, err = socket.ReadFromUDP(answerBytes)
for {
n, _, err = socket.ReadFromUDP(answerBytes)
if err != nil {
break
}
answer := string(answerBytes[0:n])
if strings.Index(answer, st) < 0 {
continue
}
// HTTP header field names are case-insensitive.
// http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2
// 获得设备location
locString := "\r\nlocation:"
answer = strings.ToLower(answer)
locIndex := strings.Index(answer, locString)
if locIndex < 0 {
continue
}
loc := answer[locIndex+len(locString):]
endIndex := strings.Index(loc, "\r\n")
if endIndex < 0 {
continue
}
// 获得设备的描述url和设备类型
locURL := strings.TrimSpace(loc[0:endIndex])
var serviceURL, urnDomain string
serviceURL, urnDomain, err = getServiceURL(locURL)
if err != nil {
return
}
var ourIP net.IP
ourIP, err = localIPv4()
if err != nil {
return
}
nat = &upnpNAT{serviceURL: serviceURL, ourIP: ourIP.String(), urnDomain: urnDomain}
return
}
}
err = errors.New("UPnP port discovery failed.")
return
}

添加端口映射

向upnp设备发送一条http post请求,将内部网络ip:port和外部网络ip:port做映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (n *upnpNAT) AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error) {
// A single concatenation would break ARM compilation.
message := "<u:AddPortMapping xmlns:u=\"urn:" + n.urnDomain + ":service:WANIPConnection:1\">\r\n" +
"<NewRemoteHost></NewRemoteHost><NewExternalPort>" + strconv.Itoa(externalPort)
message += "</NewExternalPort><NewProtocol>" + protocol + "</NewProtocol>"
message += "<NewInternalPort>" + strconv.Itoa(internalPort) + "</NewInternalPort>" +
"<NewInternalClient>" + n.ourIP + "</NewInternalClient>" +
"<NewEnabled>1</NewEnabled><NewPortMappingDescription>"
message += description +
"</NewPortMappingDescription><NewLeaseDuration>" + strconv.Itoa(timeout) +
"</NewLeaseDuration></u:AddPortMapping>"

var response *http.Response
response, err = soapRequest(n.serviceURL, "AddPortMapping", message, n.urnDomain)
if response != nil {
defer response.Body.Close()
}
if err != nil {
return
}

// TODO: check response to see if the port was forwarded
// log.Println(message, response)
// JAE:
// body, err := ioutil.ReadAll(response.Body)
// fmt.Println(string(body), err)
mappedExternalPort = externalPort
_ = response
return
}

删除端口映射

向upnp设备发送一条http post请求,将内部网络ip:port和外部网络ip:port删除映射关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (n *upnpNAT) DeletePortMapping(protocol string, externalPort, internalPort int) (err error) {

message := "<u:DeletePortMapping xmlns:u=\"urn:" + n.urnDomain + ":service:WANIPConnection:1\">\r\n" +
"<NewRemoteHost></NewRemoteHost><NewExternalPort>" + strconv.Itoa(externalPort) +
"</NewExternalPort><NewProtocol>" + protocol + "</NewProtocol>" +
"</u:DeletePortMapping>"

var response *http.Response
response, err = soapRequest(n.serviceURL, "DeletePortMapping", message, n.urnDomain)
if response != nil {
defer response.Body.Close()
}
if err != nil {
return
}

// TODO: check response to see if the port was deleted
// log.Println(message, response)
_ = response
return
}

获取映射后的公网地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (n *upnpNAT) GetExternalAddress() (addr net.IP, err error) {
info, err := n.getExternalIPAddress()
if err != nil {
return
}
addr = net.ParseIP(info.externalIpAddress)
return
}

func (n *upnpNAT) getExternalIPAddress() (info statusInfo, err error) {

message := "<u:GetExternalIPAddress xmlns:u=\"urn:" + n.urnDomain + ":service:WANIPConnection:1\">\r\n" +
"</u:GetExternalIPAddress>"

var response *http.Response
response, err = soapRequest(n.serviceURL, "GetExternalIPAddress", message, n.urnDomain)
if response != nil {
defer response.Body.Close()
}
if err != nil {
return
}
var envelope Envelope
data, err := ioutil.ReadAll(response.Body)
reader := bytes.NewReader(data)
xml.NewDecoder(reader).Decode(&envelope)

info = statusInfo{envelope.Soap.ExternalIP.IPAddress}

if err != nil {
return
}

return
}

bytom源码分析-启动与停止

发表于 2018-04-21

bytom源码分析-启动与停止

简介

https://github.com/Bytom/bytom
本章介绍bytom代码启动、节点初始化、及停止的过程

作者使用MacOS操作系统,其他平台也大同小异
Golang Version: 1.8

预备工作

编译安装

详细步骤见官方 bytom install

设置debug日志输出

开启debug输出文件、函数、行号等详细信息

1
export BYTOM_DEBUG=debug

初始化并启动bytomd

初始化

1
./bytomd init --chain_id testnet

bytomd目前支持两种网络,这里我们使用测试网
mainnet:主网
testnet:测试网

启动bytomd

1
./bytomd node --mining --prof_laddr=":8011"

–prof_laddr=”:8080” // 开启pprof输出性能指标
访问:http://127.0.0.1:8080/debug/pprof/

bytomd init初始化

入口函数
cmd/bytomd/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func init() {
log.SetFormatter(&log.TextFormatter{FullTimestamp: true, DisableColors: true})

// If environment variable BYTOM_DEBUG is not empty,
// then add the hook to logrus and set the log level to DEBUG
if os.Getenv("BYTOM_DEBUG") != "" {
log.AddHook(ContextHook{})
log.SetLevel(log.DebugLevel)
}
}

func main() {
cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir()))
cmd.Execute()
}

init函数会在main执行之前做初始化操作,可以看到init中bytomd加载BYTOM_DEBUG变量来设置debug日志输出

command cli传参初始化
bytomd的cli解析使用cobra库

cmd/bytomd/commands

  • cmd/bytomd/commands/root.go
    初始化–root传参。bytomd存储配置、keystore、数据的root目录。在MacOS下,默认路径是~/Library/Bytom/
  • cmd/bytomd/commands/init.go
    初始化–chain_id传参。选择网络类型,在启动bytomd时我们选择了testnet也就是测试网络
  • cmd/bytomd/commands/version.go
    初始化version传参
  • cmd/bytomd/commands/run_node.go
    初始化node节点运行时所需要的传参

初始化默认配置
用户传参只有一部分参数,那节点所需的其他参数需要从默认配置中加载。
cmd/bytomd/commands/root.go

1
2
3
var (
config = cfg.DefaultConfig()
)

在root.go中有一个config全局变量加载了node所需的所有默认参数

1
2
3
4
5
6
7
8
9
10
// Default configurable parameters.
func DefaultConfig() *Config {
return &Config{
BaseConfig: DefaultBaseConfig(), // node基础相关配置
P2P: DefaultP2PConfig(), // p2p网络相关配置
Wallet: DefaultWalletConfig(), // 钱包相关配置
Auth: DefaultRPCAuthConfig(), // 验证相关配置
Web: DefaultWebConfig(), // web相关配置
}
}

后面的文章会一一介绍每个配置的作用

bytomd 守护进程启动与退出

cmd/bytomd/commands/run_node.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func runNode(cmd *cobra.Command, args []string) error {
// Create & start node
n := node.NewNode(config)
if _, err := n.Start(); err != nil {
return fmt.Errorf("Failed to start node: %v", err)
} else {
log.WithField("nodeInfo", n.SyncManager().Switch().NodeInfo()).Info("Started node")
}

// Trap signal, run forever.
n.RunForever()

return nil
}

runNode函数有三步操作:
node.NewNode:初始化node运行环境
n.Start:启动node
n.RunForever:监听退出信号,收到ctrl+c操作则退出node。在linux中守进程一般监听SIGTERM信号(ctrl+c)作为退出守护进程的信号

初始化node运行环境

在bytomd中有五个db数据库存储在–root参数下的data目录

  • accesstoken.db // 存储token相关信息(钱包访问控制权限)
  • trusthistory.db // 存储p2p网络同步相关信息
  • txdb.db // 存储交易相关信息
  • txfeeds.db //
  • wallet.db // 存储钱包相关信息

node/node.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
func NewNode(config *cfg.Config) *Node {
ctx := context.Background()
initActiveNetParams(config)
// Get store 初始化txdb数据库
txDB := dbm.NewDB("txdb", config.DBBackend, config.DBDir())
store := leveldb.NewStore(txDB)

// 初始化accesstoken数据库
tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
accessTokens := accesstoken.NewStore(tokenDB)

// 初始化event事件调度器,也叫任务调度器。一个任务可以被多次调用
// Make event switch
eventSwitch := types.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
}

// 初始化交易池
txPool := protocol.NewTxPool()
chain, err := protocol.NewChain(store, txPool)
if err != nil {
cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
}

var accounts *account.Manager = nil
var assets *asset.Registry = nil
var wallet *w.Wallet = nil
var txFeed *txfeed.Tracker = nil

// 初始化txfeeds数据库
txFeedDB := dbm.NewDB("txfeeds", config.DBBackend, config.DBDir())
txFeed = txfeed.NewTracker(txFeedDB, chain)

if err = txFeed.Prepare(ctx); err != nil {
log.WithField("error", err).Error("start txfeed")
return nil
}

// 初始化keystore
hsm, err := pseudohsm.New(config.KeysDir())
if err != nil {
cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
}

// 初始化钱包,默认wallet是开启状态
if !config.Wallet.Disable {
walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
accounts = account.NewManager(walletDB, chain)
assets = asset.NewRegistry(walletDB, chain)
wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
if err != nil {
log.WithField("error", err).Error("init NewWallet")
}

// Clean up expired UTXO reservations periodically.
go accounts.ExpireReservations(ctx, expireReservationsPeriod)
}
newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)

// 初始化网络节点同步管理
syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)

// 初始化pprof,pprof用于输出性能指标,需要制定--prof_laddr参数来开启,在文章开头我们已经开启该功能
// run the profile server
profileHost := config.ProfListenAddress
if profileHost != "" {
// Profiling bytomd programs.see (https://blog.golang.org/profiling-go-programs)
// go tool pprof http://profileHose/debug/pprof/heap
go func() {
http.ListenAndServe(profileHost, nil)
}()
}

// 初始化节点,填充节点所需的所有参数环境
node := &Node{
config: config,
syncManager: syncManager,
evsw: eventSwitch,
accessTokens: accessTokens,
wallet: wallet,
chain: chain,
txfeed: txFeed,
miningEnable: config.Mining,
}

// 初始化挖矿
node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)

node.BaseService = *cmn.NewBaseService(nil, "Node", node)

return node
}

目前bytomd只支持cpu挖矿,所以在代码中只有cpuminer的初始化信息

启动node

node/node.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Lanch web broser or not
func lanchWebBroser() {
log.Info("Launching System Browser with :", webAddress)
if err := browser.Open(webAddress); err != nil {
log.Error(err.Error())
return
}
}

func (n *Node) initAndstartApiServer() {
n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)

listenAddr := env.String("LISTEN", n.config.ApiAddress)
env.Parse()
n.api.StartServer(*listenAddr)
}

func (n *Node) OnStart() error {
if n.miningEnable {
n.cpuMiner.Start()
}
n.syncManager.Start()
n.initAndstartApiServer()
if !n.config.Web.Closed {
lanchWebBroser()
}

return nil
}

OnStart() 启动node进程如下:

  • 启动挖矿功能
  • 启动p2p网络同步
  • 启动http协议的apiserver服务
  • 打开浏览器访问bytond的交易页面

停止node

bytomd在启动时执行了n.RunForever()函数,该函数是由tendermint框架启动了监听信号的功能:
vendor/github.com/tendermint/tmlibs/common/os.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func TrapSignal(cb func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
for sig := range c {
fmt.Printf("captured %v, exiting...\n", sig)
if cb != nil {
cb()
}
os.Exit(1)
}
}()
select {}
}

TrapSignal函数监听了SIGTERM信号,bytomd才能成为不退出的守护进程。只有当触发了ctrl+c或kill bytomd_pid才能终止bytomd进程退出。退出时bytomd执行如下操作
node/node.go

1
2
3
4
5
6
7
8
9
func (n *Node) OnStop() {
n.BaseService.OnStop()
if n.miningEnable {
n.cpuMiner.Stop()
}
n.syncManager.Stop()
log.Info("Stopping Node")
// TODO: gracefully disconnect from peers.
}

bytomd会将挖矿功能停止,p2p网络停止等操作。

Hexo

发表于 2016-05-24

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment

12
Derek

Derek

If you only do what you can do you'll never be more than you are now

12 日志
1 分类
2 标签
RSS
© 2018 Derek
由 Hexo 强力驱动
主题 - NexT.Pisces