网络层设计与实现

undefined网络层设计与实现

undefinedNode

一个网络节点(Node)命名为Network。

Node用Network来定义和实现,特指P2P网络节点,更体现Node的本质。

  1. // Network 节点的数据结构
  2. type Network struct {
  3. Host host.Host//主机
  4. GeneralChannel *Channel//通用节点
  5. MiningChannel *Channel//挖矿节点
  6. FullNodesChannel *Channel//全节点
  7. Blockchain *blockchain.Blockchain
  8. Blocks chan *blockchain.Block//Block类型的通道
  9. Transactions chan *blockchain.Transaction//Transaction类型的通道
  10. Miner bool
  11. }

undefinedChannel

Channel为通信通道,每个host有三个通信通道,但根据其节点的类别,一般一个节点只用到其中一个通信通道。

  1. // Channel 的数据结构
  2. type Channel struct {
  3. ctx context.Context
  4. pub *pubsub.PubSub//发布者
  5. topic *pubsub.Topic
  6. sub *pubsub.Subscription//订阅者
  7. channelName string//构成Topic名称字符串的组成部分(TopicName="channel:" + channelName)
  8. self peer.ID
  9. Content chan *ChannelContent//ChannelContent类型的通道
  10. }

GeneralChannel为通用节点,负责列举所有连接到主机(host)的所有peer,这也是所有连接到host的peer,处理除了tx之外的所有命令消息。

FullNodesChannel为全节点,处理与交易相关的tx及gettxfrompool命令,即将新交易放到内存池,以及每秒不断将交易从交易池中取出(这里我们每秒只取出一条交易,可以优化为每次取出多条交易)给挖矿节点进行挖矿。

MiningChannel为挖矿节点,处理来自交易池的inv命令及来自交易池的tx命令。

undefinedHost

P2P的host package定义了Host这一interface。

  1. // 为本主机(host)创建一对新的 RSA 密钥
  2. prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
  3. if err != nil {
  4. panic(err)
  5. }
  6. transports := libp2p.ChainOptions(
  7. libp2p.Transport(tcp.NewTCPTransport),//支持TCP传输协议
  8. libp2p.Transport(ws.New),//支持websorcket传输协议
  9. )
  10. muxers := libp2p.ChainOptions(
  11. libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),//支持"/yamux/1.0.0"流连接(基于可靠连接的多路I/O复用)
  12. libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),//支持"/mplex/6.7.0"流连接(二进制流多路I/O复用),由LibP2P基于multiplex创建
  13. )
  14. if len(listenPort) == 0 {
  15. listenPort = "0"
  16. }
  17. listenAddrs := libp2p.ListenAddrStrings(
  18. fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", listenPort),//支持tcp传输
  19. fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws", listenPort),//支持websorket传输
  20. )
  21. // Host是参与p2p网络的对象,它实现协议或提供服务。
  22. // 它像服务器一样处理请求,像客户端一样发出请求。
  23. // 之所以称为 Host,是因为它既是 Server 又是 Client(而 Peer 可能会混淆)。
  24. // 1、创建host
  25. // 重要:创建主机host
  26. //-如果没有提供transport和listen addresses,节点将监听在多地址(mutiaddresses): "/ip4/0.0.0.0/tcp/0" 和 "/ip6/::/tcp/0";
  27. //-如果没有提供transport的选项,节点使用TCP和websorcket传输协议
  28. //-如果multiplexer配置没有提供,节点缺省使用"yamux/1.0.0" 和 "mplux/6.7.0"流连接配置
  29. //-如果没有提供security transport,主机使用go-libp2p的noise和/或tls加密的transport来加密所有的traffic(新版本libp2p已经不再支持security transport参数设置)
  30. //-如果没有提供peer的identity,它产生一个随机RSA 2048键值对,并由它导出一个新的identity
  31. //-如果没有提供peerstore,主机使用一个空的peerstore来进行初始化
  32. host, err := libp2p.New(
  33. ctx,
  34. transports,
  35. listenAddrs,
  36. muxers,
  37. libp2p.Identity(prvKey),
  38. )

上述代码中第一步创建Host:

  1. host, err := libp2p.New(...)

我们追溯New函数,它来自于libp2p.go,最终调用的是:

  1. func NewWithoutDefaults(ctx context.Context, opts ...Option) (host.Host, error) {
  2. varcfg Config
  3. if err := cfg.Apply(opts...); err != nil {
  4. returnnil, err
  5. }
  6. return cfg.NewNode(ctx)
  7. }

我们继续追溯cfg.NewNode(ctx),在P2Plib的config.go,关键代码如下:

  1. func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
  2. swrm, err := cfg.makeSwarm(ctx)
  3. if err != nil {
  4. returnnil, err
  5. }
  6. h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{
  7. ConnManager: cfg.ConnManager,
  8. AddrsFactory: cfg.AddrsFactory,
  9. NATManager: cfg.NATManager,
  10. EnablePing: !cfg.DisablePing,
  11. UserAgent: cfg.UserAgent,
  12. })
  13. ...
  14. h.Start()
  15. if router != nil {
  16. return outed.Wrap(h, router), nil
  17. }
  18. return h, nil
  19. }

security transport,默认的值为:

  1. var DefaultSecurity = libp2p.ChainOptions(
  2. Security(noise.ID, noise.New),
  3. Security(tls.ID, tls.New),
  4. )

上述代码的第一个关键是:

  1. swrm, err := cfg.makeSwarm(ctx)

我们追溯进去,看看cfg.makeSwarm(ctx):

  1. func (cfg *Config) makeSwarm(ctx context.Context) (*swarm.Swarm, error) {
  2. //从config保存的公钥得到pid
  3. pid, err := peer.IDFromPublicKey(cfg.PeerKey.GetPublic())
  4. ...
  5. swrm := swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter, cfg.ConnectionGater)
  6. return swrm, nil

我们继续追溯swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter, cfg.ConnectionGater):

  1. func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc metrics.Reporter, extra ...interface{}) *Swarm {
  2. s := &Swarm{
  3. local: local,
  4. peers: peers,
  5. bwc: bwc,
  6. }
  7. ...
  8. return s

可见,peer.ID被赋值到Swarm对象的local变量。

我们回到函数:

  1. func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
  2. swrm, err := cfg.makeSwarm(ctx)
  3. if err != nil {
  4. returnnil, err
  5. }
  6. h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{
  7. ConnManager: cfg.ConnManager,
  8. AddrsFactory: cfg.AddrsFactory,
  9. NATManager: cfg.NATManager,
  10. EnablePing: !cfg.DisablePing,
  11. UserAgent: cfg.UserAgent,
  12. })
  13. ...
  14. h.Start()
  15. if router != nil {
  16. return outed.Wrap(h, router), nil
  17. }
  18. return h, nil
  19. }

前面已经讨论完了swrm, err := cfg.makeSwarm(ctx),我们继续往下看,swrm成为创建Host的一个参数:

  1. h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{
  2. ConnManager: cfg.ConnManager,
  3. AddrsFactory: cfg.AddrsFactory,
  4. NATManager: cfg.NATManager,
  5. EnablePing: !cfg.DisablePing,
  6. UserAgent: cfg.UserAgent,
  7. })

bhost是一个package:

  1. bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

我们查看上面的NewHost,进入到basichost package(basic_host.go):

定义了basichost:

  1. type BasicHost struct

然后BasicHost实现了Host的所有接口方法,其中NewHost接口实现如下:

  1. func NewHost(ctx context.Context, n network.Network, opts \*HostOpts) (\*BasicHost, error) {
  2. hostCtx, cancel := context.WithCancel(ctx)
  3. h := &BasicHost{
  4. network: n,
  5. mux: msmux.NewMultistreamMuxer(),
  6. negtimeout: DefaultNegotiationTimeout,
  7. AddrsFactory: DefaultAddrsFactory,
  8. maResolver: madns.DefaultResolver,
  9. eventbus: eventbus.NewBus(),
  10. addrChangeChan: make(chanstruct{}, 1),
  11. ctx: hostCtx,
  12. ctxCancel: cancel,
  13. disableSignedPeerRecord: opts.DisableSignedPeerRecord,
  14. }
  15. ...
  16. return h, nil

swrm作为参数传给了n network.Network。而实现的接口:

  1. func (h *BasicHost) ID() peer.ID {
  2. return h.Network().LocalPeer()
  3. }

h.Network()返回Swarm对象(Swarm是一个struct,实现了接口network.Network(Network是一个interface))

  1. func (h *BasicHost) Network() network.Network {
  2. return h.network
  3. }

我们看看Swarm的函数LocalPeer(),正好返回的是local(即peer的ID):

  1. func (s *Swarm) LocalPeer() peer.ID {
  2. return s.local
  3. }

undefined小结

1、主机实际上是BasicHost struct,它实现了Host interface,peer.ID在创建host时候已经在Host中得到了(host.ID()得到的即是peer.ID)。

2、同时Swarm struct实现了libp2p的network.Network interface。

3、BasicHost和Swarm均由p2plib提供。

undefinedPeer

Peer为对等端,是host的第三方视觉的概念。

Peer以ID为唯一标识,peer.ID是通过哈希peer的公钥而派生,并编码其哈希输出为multihash的结果。

peer.ID是往后不同节点之间进行通信的重要参数,它代表一个Host,或者说,我们可以通过peer.ID获得一个具体的Host对象。如发送虚拟币:

  1. func (net *Network) SendTx(peerId string, transaction *blockchain.Transaction) {
  2. memoryPool.Add(*transaction)
  3. tnx := Tx{net.Host.ID().Pretty(), transaction.Serializer()}
  4. payload := GobEncode(tnx)
  5. request := append(CmdToBytes("tx"), payload...)
  6. // 给全节点(FullNodes)第通信通道发布此消息,全节点将进行处理
  7. net.FullNodesChannel.Publish("接收到 Send transaction 命令", request, peerId)
  8. }

如同Host一样,peer package也是在libp2p库中定义,所在的文件是peer.go,不同的是,在peer.go中并没有定义一个peer的struct,而是直接在peer package中定义ID:

  1. type ID string

但显然ID是一个mutihash的值,如需要对外呈现需要使用base58编码后得到人可以识别的字符串:

  1. func (id ID) String() string {
  2. return id.Pretty()
  3. }

Pretty方法如下:

  1. func (id ID) Pretty() string {
  2. returnIDB58Encode(id)
  3. }

undefined网络通信流程

一切从startNode开始。

main.go:

  1. cli.StartNode(listenPort, minerAddress, miner, fullNode, func(net *p2p.Network) {//最后一个参数是回调函数,获得net实例
  2. if rpc {
  3. cli.P2p = net//启动节点后设置cli的P2P实例,net为启动节点函数的回调函数参数被回调后返回的Network实例
  4. go jsonrpc.StartServer(cli, rpc, rpcPort, rpcAddr)
  5. }
  6. })

其中cli的结构:

  1. type CommandLinestruct {
  2. Blockchain *blockchain.Blockchain
  3. P2p *p2p.Network
  4. CloseDbAlways bool//每次命令执行完毕是否关闭数据库
  5. }

其中istenPort, minerAddress, miner, fullNode等参数的值来自于命令startnode执行时获得的命令行参数。

cli.StartNode实现:

  1. // StartNode 启动节点,其中fn为回调函数,p2p.StartNode调用过程中调用fn,设置p2p.Network实例
  2. func (cli *CommandLine) StartNode(listenPort, minerAddress string, miner, fullNode bool, fn func(*p2p.Network)) {
  3. if miner {
  4. log.Infof("作为矿工正在启动节点: %s\\n", listenPort)
  5. iflen(minerAddress) > 0 {
  6. if wallet.ValidateAddress(minerAddress) {
  7. log.Info("正在挖矿,接收奖励的地址是:", minerAddress)
  8. } else {
  9. log.Fatal("请提供一个合法的矿工地址")
  10. }
  11. }
  12. } else {
  13. log.Infof("在: %s\\n端口上启动节点", listenPort)
  14. }
  15. chain := cli.Blockchain.ContinueBlockchain()
  16. p2p.StartNode(chain, listenPort, minerAddress, miner, fullNode, fn)
  17. }

在获得了blockchain实例后,调用p2p package的StartNode函数:

  1. // StartNode 启动一个节点
  2. func StartNode(chain *blockchain.Blockchain, listenPort, minerAddress string, miner, fullNode bool, callback func(*Network)) {
  3. var r io.Reader
  4. r = rand.Reader//没有指定seed,使用随机种子
  5. MinerAddress = minerAddress
  6. ctx, cancel := context.WithCancel(context.Background())
  7. defercancel()
  8. defer chain.Database.Close()//函数运行结束,关闭区块链数据库
  9. go appUtils.CloseDB(chain)//启动协程,遇到程序强行终止信号时关闭数据库,退出程序
  10. // 为本主机(host)创建一对新的 RSA 密钥
  11. prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
  12. if err != nil {
  13. panic(err)
  14. }
  15. transports := libp2p.ChainOptions(
  16. libp2p.Transport(tcp.NewTCPTransport),//支持TCP传输协议
  17. libp2p.Transport(ws.New),//支持websorcket传输协议
  18. )
  19. muxers := libp2p.ChainOptions(
  20. libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),//支持"/yamux/1.0.0"流连接(基于可靠连接的多路I/O复用)
  21. libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),//支持"/mplex/6.7.0"流连接(二进制流多路I/O复用),由LibP2P基于multiplex创建
  22. )
  23. if len(listenPort) == 0 {
  24. listenPort = "0"
  25. }
  26. listenAddrs := libp2p.ListenAddrStrings(
  27. fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", listenPort),//支持tcp传输
  28. fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws", listenPort),//支持websorket传输
  29. )
  30. // Host是参与p2p网络的对象,它实现协议或提供服务。
  31. // 它像服务器一样处理请求,像客户端一样发出请求。
  32. // 之所以称为 Host,是因为它既是 Server 又是 Client(而 Peer 可能会混淆)。
  33. // 1、创建host
  34. // 重要:创建主机host
  35. //-如果没有提供transport和listen addresses,节点将监听在多地址(mutiaddresses): "/ip4/0.0.0.0/tcp/0" 和 "/ip6/::/tcp/0";
  36. //-如果没有提供transport的选项,节点使用TCP和websorcket传输协议
  37. //-如果multiplexer配置没有提供,节点缺省使用"yamux/1.0.0" 和 "mplux/6.7.0"流连接配置
  38. //-如果没有提供security transport,主机使用go-libp2p的noise和/或tls加密的transport来加密所有的traffic(新版本libp2p已经不再支持security transport参数设置)
  39. //-如果没有提供peer的identity,它产生一个随机RSA 2048键值对,并由它导出一个新的identity
  40. //-如果没有提供peerstore,主机使用一个空的peerstore来进行初始化
  41. host, err := libp2p.New(
  42. ctx,
  43. transports,
  44. listenAddrs,
  45. muxers,
  46. libp2p.Identity(prvKey),
  47. )
  48. if err != nil {
  49. panic(err)
  50. }
  51. for _, addr := range host.Addrs() {
  52. fmt.Println("正在监听在", addr)
  53. }
  54. log.Info("主机已创建: ", host.ID())
  55. // 2、使用GossipSub路由,创建一个新的基于Gossip 协议的 PubSub 服务系统
  56. // 任何一个主机节点,都是一个订阅发布服务系统
  57. // 这是整个区块链网络运行的关键所在
  58. pub, err := pubsub.NewGossipSub(ctx, host)
  59. if err != nil {
  60. panic(err)
  61. }
  62. // 3、构建三个通信通道,通信通道使用发布-订阅系统,在不同节点之间传递信息
  63. // 之所以需要三个通道,是因为未来规划不同节点拥有不同的功能,不同功能的节点完成不同类型的任务。
  64. // 三个通道的消息独立,只有订阅了该通道消息的节点,才能收到该通道的消息,然后进行处理,以完成相应的任务。
  65. // 任何一个节点,均创建了三个通道实例,这意味着人一个节点都可以根据需要,选择任意一个通道发送消息
  66. // 在订阅上,一个具体的节点, GeneralChannel 订阅将消息,如果是采矿节点(miner==true),miningChannel 会接收到消息,
  67. // 如果是全节点(fullNode==true),fullNodesChannel会接受到消息
  68. //GeneralChannel 通道订阅消息
  69. generalChannel, _ := JoinChannel(ctx, pub, host.ID(), GeneralChannel, true)
  70. subscribe := false
  71. if miner {
  72. subscribe = true
  73. }
  74. //如果是挖矿节点, miningChannel 订阅消息,否则 miningChannel 不订阅消息
  75. miningChannel, _ := JoinChannel(ctx, pub, host.ID(), MiningChannel, subscribe)
  76. subscribe = false
  77. if fullNode {
  78. subscribe = true
  79. }
  80. //如果是全节点, fullNodesChannel 订阅消息,否则 fullNodesChannel 不订阅消息
  81. fullNodesChannel, _ := JoinChannel(ctx, pub, host.ID(), FullNodesChannel, subscribe)
  82. // 3、为各通信通道建立命令行界面对象
  83. ui := NewCLIUI(generalChannel, miningChannel, fullNodesChannel)
  84. // 4、建立对等端(peer)发现机制(discovery),使得本节点可以被网络上的其它节点发现
  85. //同时将主机(host)连接到所有已经发现的对等端(peer)
  86. err = SetupDiscovery(ctx, host)
  87. if err != nil {
  88. panic(err)
  89. }
  90. network := &Network{
  91. Host: host,
  92. GeneralChannel: generalChannel,
  93. MiningChannel: miningChannel,
  94. FullNodesChannel: fullNodesChannel,
  95. Blockchain: chain,
  96. Blocks: make(chan *blockchain.Block, 200),
  97. Transactions: make(chan *blockchain.Transaction, 200),
  98. Miner: miner,
  99. }
  100. // 5、回调,将节点(network)实例传回
  101. callback(network)
  102. // 6、向全网请求区块信息,以补全本地区块链
  103. // 每一个节点均有区块链的一个完整副本
  104. err = RequestBlocks(network)
  105. // 7、启用协程,处理节点事件
  106. goHandleEvents(network)
  107. // 8、如果是矿工节点,启用协程,不断发送ping命令给全节点
  108. if miner {
  109. // 矿工事件循环,以不断地发送一个ping给全节点,目的是得到新的交易,为它挖矿,并添加到区块链
  110. go network.MinersEventLoop()
  111. }
  112. if err != nil {
  113. panic(err)
  114. }
  115. // 9、运行UI界面,将在Run函数体中启动协程,循环接收并处理全网节点publish的消息
  116. iferr = ui.Run(network); err != nil {
  117. log.Error("运行文字UI发生错误: %s", err)
  118. }
  119. }