go test

使用TestMain 或自定义Test* 为测试增加前置\后置方法调用

  • 自定义testFun1, testFun2..方法 , test的t是小写, 这样不会被默认调用
  • 自定义Test*方法, Test的T 大写, 这样可以被go test 工具识别
  • 在Test*方法中按顺序 编写 t.Run("fun_name", fun)
  • 编写TestMain方法 , 调用beforeInit, m.Run()(会调用其它Test*方法) afterClear

    不写TestMain方法, 直接在 Test*方法中调用也一样,
    有TestMain方法时, go test 只会识别 TestMain, 所以要显示调用 m.Run() 来调用其它测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package test
import (
"bytes"
"fmt"
"os/exec"
"testing"
)

// cmd.go (写在一个文件中方法读代码)
// 待测试方法,
func ExecShell(s string) (string, error) {
cmd := exec.Command("/bin/bash", "-c", s)
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
return "", err
}
return out.String(), nil
}

func testExecShell(t *testing.T) {
t.Log("testExecShell")
_, err := ExecShell("ls")
if err != nil {
t.Error(err)
}
}

func testFun1(t *testing.T) {
t.Log("testFun1")
}

func beforeInit() {
fmt.Println("beforeInit")
}

func afterClear() {
fmt.Println("afterClear")
}

func TestAll(t *testing.T) {
t.Run("testExec..", testExecShell)
t.Run("testfun1", testFun1)
}

// 在测试中增加前置/后置函数调用的方法, 思路: 自定义测试的新入口,
// 1 自定义testFun1, testFun2..方法 , test的t是小写, 这样不会被默认调用
// 2 自定义Test*方法, Test的T 大写, 这样可以被go test 工具识别
// 3 在Test*方法中按顺序 编写 t.Run("fun_name", fun)
// 4 编写TestMain 方法 , 调用beforeInit, m.Run()(会调用其它Test*方法) afterClear
// 当前不写TestMain方法, 直接在 Test*方法中调用也一样
func TestMain(m *testing.M) {
fmt.Println("TestMain")
beforeInit()
m.Run()
afterClear()
//m.Run()
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// go test -v 
---------------------
TestMain
beforeInit
=== RUN TestAll
=== RUN TestAll/testExec..
cmd_test.go:25: testExecShell
=== RUN TestAll/testfun1
cmd_test.go:33: testFun1
--- PASS: TestAll (0.01s)
--- PASS: TestAll/testExec.. (0.01s)
--- PASS: TestAll/testfun1 (0.00s)
PASS
afterClear
ok com.huoli.saas-manager/src/common/util/test 0.022s


自实现 Goroutine pool

自实现 Goroutine pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// limiter.go 

package limiter

import (
"fmt"
"go.uber.org/zap"
"station.grab/src/common"
"sync"
"time"
)

const (
// MinimaLimit is the minimal concurrency limit
MinimaLimit = 5
)

// Job is an interface for add jobs.
type Job interface {
Run() (resp string, err error)
}

// EasyLimiter object
type EasyLimiter struct {
semp chan struct{} // 控制并发的chan

wg sync.WaitGroup // waitGroup 用于等待协程执行完成, 并关闭通道\清理资源

jobChan chan Job // Job 队列(实现接口即可, 解耦了任务的具体实现)
resultChan chan interface{} // job执行结果队列
}

func NewEasyLimiter(taskCount, limit int) *EasyLimiter {
if limit <= MinimaLimit {
limit = MinimaLimit
}

c := &EasyLimiter{
semp: make(chan struct{}, limit),
resultChan: make(chan interface{}, taskCount),
jobChan: make(chan Job, taskCount),
}

// 创建后马上就监听job队列
// job队列中有数据且semp队列未满 (满了会阻塞,以此来实现并发控制), 则取出job对象, 交给单独协程处理
go func() {
for job := range c.jobChan {
//c.semp <- struct{}{}

select {
case c.semp <- struct{}{}:

case <-time.After(time.Millisecond * 200):
common.ZLogger.Info("goroutine pool full, wait for 200 mis ", zap.Int("size", len(c.semp)))
}

go func(ajob Job) {
defer func() {
c.wg.Done()
<-c.semp
}()
//common.ZLogger.Info("开始执行任务")
result, err := ajob.Run()
//common.ZLogger.Info("完成执行任务")
if err != nil {
fmt.Printf("err:%v", err)
}
c.resultChan <- result

}(job)
}
common.ZLogger.Info("task队列关闭")
}()

return c
}

func (c *EasyLimiter) AddJob(job Job) {
c.wg.Add(1)
c.jobChan <- job
//common.ZLogger.Info("添加任务")
}

func (c *EasyLimiter) Wait() {
// 关闭job队列 ,此时已不会再添加
close(c.jobChan)
c.wg.Wait()
// 关闭result队列,以保证range方式读取chan 程序会正常向下执行
close(c.resultChan)
}


测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

// limiter_test.go

package limiter

import (
"fmt"
"go.uber.org/zap"
"math/rand"
"station.grab/src/common"
"testing"
"time"
)

func Max(a int, b int) int { //注意参数和返回值是怎么声明的

if a > b {
return a
}
return b
}
func RandomInt(n int) int {
rand.Seed(time.Now().UnixNano())
v := rand.Intn(n)
if v < 1 {
v = 1
}
return v
}

type SampleJob struct {
total int
idx int
key string
}

func (s SampleJob) Run() (resp string, err error) {

v := fmt.Sprintf("job run: %d/%d, %v", s.idx, s.total, s.key)
common.ZLogger.Info(v)
//time.Sleep(time.Second*time.Duration(RandomInt(5)))
time.Sleep(time.Second * 1)

return v, nil
}

func TestLimiter_Execute(t *testing.T) {

fmt.Println("begin")
total := 20
limiter := NewEasyLimiter(total, 5)

for i := 0; i < total; i++ {
limiter.AddJob(&SampleJob{
total: total,
idx: i,
key: "test",
})
}

// 控制完成并退出的信号
chanSignal := make(chan interface{})

go func() {
for {
select {
case result, ok := <-limiter.resultChan:
common.ZLogger.Info("read from result chan ", zap.Any("result", result))
if !ok {
common.ZLogger.Info("result通道关闭,退出当前goroutine")
chanSignal<- 1
return
}
}
}
}()

limiter.Wait()

<-chanSignal

fmt.Println("done")
}

func TestRandom(t *testing.T) {
for i := 0; i < 20; i++ {
fmt.Println(RandomInt(5))
}

}



利用有缓冲通道写满时阻塞的特点来控制协程的并发数量

利用有缓冲通道写满时阻塞的特点来控制协程的并发数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
type LineGraber struct {
//最多允许n个并发同时执行
ConcurrentNum int `json:"concurrent_num"`
}

func (grab LineGraber) Work() {

// "work start"

//最少5个协程一起工作
concurrentNum := grab.ConcurrentNum
if concurrentNum <= 0 {
concurrentNum = 5
}
// sem通道容量限制了并发的协程数
sem := make(chan struct{}, concurrentNum)

// 准备待处理的任务列表
taskLst := grab.buildTaskLst()

// 接口任务处理结果的通道
chanRespv := make(chan string, len(taskLst))

total := len(taskLst)
// log info fmt.Sprintf("tasksize:%d", total)

var wg sync.WaitGroup
for idx, key := range taskLst {
wg.Add(1)
go func(taskv string, i int) {
// 确保 wg.Done() (与add对应),
// <-sem, 从semchan中取出(与下面 sem<- struct{}{}对应)
defer func() {
wg.Done()
<-sem
}()
sem <- struct{}{}
// 真实业务逻辑部分
// 方法中要将结果写入到chanRespv
// log info i, taskv
grab.processTask(taskv, oneproxy, chanRespv)

}(key, idx)
}

wg.Wait()
close(chanRespv)

// range 可以从已经关闭的chan 中读取原有数据, 并退出
for respv := range chanRespv {
// log info read from chanResp
var respResult RespResult
err := json.Unmarshal([]byte(respv), &respResult)
if err != nil {
// log error decode异常
continue
}
// process ...

}

// log info ("work done")

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

// 以上方法中在 wg.Wait之后才开始接收 chanRespv ,其实还可以优化\尽早提前处理chanRespv(可以及时读取到结果,避免所有任务完成后才读取到结果的情况), 只需新启协程在wg.Wait 之前接收处理 .
// 要增加另外一个通道, 控制完成并退出的信号

// 在wg.wait 之前
chanSignal := make(chan interface{})
go func() {
for {
select {
case result, ok := <-limiter.resultChan:
common.ZLogger.Info("done", zap.Any("result", result))
if !ok {
common.ZLogger.Info("result通道关闭,退出当前goroutine")
// 通知主线程,可以继续向下执行了
chanSignal<- 1
return
}
}
}
}()

// 在程序最后添加
<-chanSignal


1
2
3
4
5
6
7
8
9
10
11
12
13
14

// 业务逻辑处理
// 注意chan 类型是只可写入 chan <- string
func (grab LineGraber) processTask(key string, proxy string, outchan chan<- string) {
aimUrl := fmt.Sprintf(grap_base_url, key)
respv, err := doget...
if err != nil {
// log error ...
}
//log info ...
outchan <- respv
}


第一次为开源项目贡献pull request ,并被作者采用

第一次为开源项目贡献pull request ,并被作者采用

项目地址: Shelnutt2/db2struct

1
2
3
4
5
6
7
8
9
    在golang的学习实践中用到了 ORM框架 GORM,  是一个优秀的开源框架, star人数很高, 在编写与之对应的mode对象时, 就自然遇到了由mysql数据表生成model的需求,
在github上找到这 Shelnutt2/db2struct 这个开源项目, 太感谢作者了, 解决了这一痛点问题.
随着对代码的深入,发现db2struct 还可以有两点改进, 所以有了此次的修改,以及pull request , 分两次解决了如下的改进,
作者也有nice,并最终合并的pr .
1 由table生成的model中的字段默认是按字段名排序的, 我们更希望的是以数据表ddl 的顺序为准
2 model 的tag 并未指定column ,这样在遇到非默认字段名规则时会关联失败
3 还有一个小改进,就是把ddl的comment 添加到model中



pr preview

py37 pandas升级后提示缺少bz2

py37 pandas升级后提示缺少bz2

参考 解决ModuleNotFoundError: No module named '_bz2'

1
2
3
4
5
在python3环境中默认安装的pandas包是最新的像, 像1.14.. ,而高版本的pandas0.23以后,会将bz2的引用放在头部 , 低版本的会在方法内按需引用 , 所以用高版本,会依赖bz2
需要系统级安装依赖来解决,如 yum install bzip2 libbz2-dev , 而对于已经安装好的python环境, 可以参考以下网友分享的方法, 大概步骤如下:
1 /python36/lib/python3.6/lib-dynload/x _bz2.cpython-37m-x86_64-linux-gnu.so
2 ln -s /usr/lib64/libbz2.so.1.0.6 /usr/lib64/libbz2.so.1.0

'field' is both an index level and a column label, which is ambiguous. 错误 , 需要重建 index
1
2
3
4
pandas0.23.1 以后版本,在pandas.merge或其它方法中会报 'field' is both an index level and a column label, which is ambiguous. 错误 , 需要重建 index
subfrm1 = subfrm1.reset_index(drop=True)