URLStore 已经成为了有效的 RPC 服务,现在可以创建另一种代表 RPC 客户端的类型,它会转发请求到 RPC 服务器,我们称它为 ProxyStore

type ProxyStore struct {
    client *rpc.Client
}

一个 RPC 客户端必须使用 DialHTTP() 方法连接到服务器,所以我们把这句加入 NewProxyStore 函数,它用于创建 ProxyStore 对象。

func NewProxyStore(addr string) *ProxyStore {
    client, err := rpc.DialHTTP("tcp", addr)
    if err != nil {
        log.Println("Error constructing ProxyStore:", err)
    }
    return &ProxyStore{client: client}
}

ProxyStoreGetPut 方法,它们利用 RPC 客户端的 Call 方法,将请求直接传递给服务器:

func (s *ProxyStore) Get(key, url *string) error {
    return s.client.Call("Store.Get", key, url)
}

func (s *ProxyStore) Put(url, key *string) error {
    return s.client.Call("Store.Put", url, key)
}

带缓存的 ProxyStore

可是,如果 slave 进程只是简单地代理所有的工作到 master 节点,不会得到任何增益!我们打算用 slave 节点来应对 Get 请求。要做到这点,它们必须有 URLStore 中 map 的一份副本(缓存)。因此我们对 ProxyStore 的定义进行扩展,将 URLStore 包含在其中:

type ProxyStore struct {
    urls *URLStore
    client *rpc.Client
}

NewProxyStore 也必须做修改:

func NewProxyStore(addr string) *ProxyStore {
    client, err := rpc.DialHTTP("tcp", addr)
    if err != nil {
        log.Println("ProxyStore:", err)
    }
    return &ProxyStore{urls: NewURLStore(""), client: client}
}

还必须修改 NewURLStore 以便给出空文件名时,不会尝试从磁盘写入或读取文件:

func NewURLStore(filename string) *URLStore {
    s := &URLStore{urls: make(map[string]string)}
    if filename != "" {
        s.save = make(chan record, saveQueueLength)
        if err := s.load(filename); err != nil {
            log.Println("Error loading URLStore: ", err)
        }
        go s.saveLoop(filename)
    }
    return s
}

ProxyStoreGet 方法需要扩展:它应该首先检查缓存中是否有对应的键。如果有,Get 返回已缓存的结果。否则,应该发起 RPC 调用,然后用返回结果更新其本地缓存:

func (s *ProxyStore) Get(key, url *string) error {
    if err := s.urls.Get(key, url); err == nil { // url found in local map
        return nil
    }
    // url not found in local map, make rpc-call:
    if err := s.client.Call("Store.Get", key, url); err != nil {
        return err
    }
    s.urls.Set(key, url)
    return nil
}

同样地,Put 方法仅当成功完成了远程 RPC Put 调用,才更新本地缓存:

func (s *ProxyStore) Put(url, key *string) error {
    if err := s.client.Call("Store.Put", url, key); err != nil {
        return err
    }
    s.urls.Set(key, url)
    return nil
}

汇总

slave 节点使用 ProxyStore,只有 master 使用 URLStore。有鉴于创造它们的方式,它们看上去十分一致:两者都实现了相同签名的 GetPut 方法,因此我们可以指定一个 Store 接口来概括它们的行为:

type Store interface {
    Put(url, key *string) error
    Get(key, url *string) error
}

现在全局变量 store 可以成为 Store 类型:

var store Store

最后,我们改写 main() 函数以便程序只作为 master 或 slave 启动(我们只能这么做,因为现在 store 是 Store 接口类型!)。

为此我们添加一个没有默认值的新命令行标志 masterAddr

var masterAddr = flag.String("master", "", "RPC master address")

如果给出 master 地址,就启动一个 slave 进程并创建新的 ProxyStore;否则启动 master 进程并创建新的 URLStore

func main() {
    flag.Parse()
    if *masterAddr != "" { // we are a slave
        store = NewProxyStore(*masterAddr)
    } else { // we are the master
        store = NewURLStore(*dataFile)
    }
    ...
}

这样,我们已启用了 ProxyStore 作为 web 前端,以代替 URLStore

其余的前端代码继续和之前一样地工作,它们不必在意 Store 接口。只有 master 进程会写数据文件。

现在可以加载一个 master 节点和数个 slave 节点,对 slave 进行压力测试。

编译这个版本 4 或直接使用现有的可执行程序。

要进行测试,首先在命令行用以下命令启动 master 节点:

./goto -http=:8081 -rpc=true    # (Windows 平台用 goto 代替 ./goto)

这里提供了 2 个标志:master 监听 8081 端口,已启用 RPC。

slave 节点用以下命令启动:

./goto -master=127.0.0.1:8081

它获取到 master 的地址,并在 8080 端口接受客户端请求。

在源码目录下已包含了以下 shell 脚本 demo.sh,用来在类 Unix 系统下自动启动程序:

#!/bin/sh
gomake
./goto -http=:8081 -rpc=true &
master_pid=$!
sleep 1
./goto -master=127.0.0.1:8081 &
slave_pid=$!
echo "Running master on :8081, slave on :8080."
echo "Visit: http://localhost:8080/add"
echo "Press enter to shut down"
read
kill $master_pid
kill $slave_pid

要在 Windows 下测试,启动 MINGW shell 并启动 master,然后每个 slave 都要单独启动新的 MINGW shell 并启动 slave 进程。

热门教程

最新教程