PHP多线程之Worker-Stackable

PHP多线程之Worker-Stackable · Oct 30, 2013 573 clicks

PHP的pthreads扩展的文档几乎等同于没有,估计作者也没怎么用心去做这个事。
研究了一下Thread,Worker,Stackable这几个类的用法,全靠一行行地去试代码。。。。

网上找了一个例子,基本上也只有这一个,我把网上的代码简化了一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
<?php
class MyStackable extends Stackable{
    public $id,$name;
    public function __construct($id){
        $this->id = $id;
        $this->name = 'Stackalbe_'.$id;
    }
    public function run(){
        $start = time();
        $wait = intval(rand(1,5));
        sleep($wait);
        $last = time()-$start;
        if($this->worker){
            $str = sprintf("Running %s in %s costs %d secs.\n",$this->getName(), $this->worker->getName(),$last );
            printf($str);          
            $this->worker->setData( $str );
            $this->worker->addAttempt();
        }else{
            printf("faild to get worker \n");
        }
    }
    public function getName(){
        return $this->name;
    }
    public function getId(){
        return $this->id;
    }
     
}
 
class MyWorker extends Worker{
    protected $id,$name,$_data=[];
    public function __construct($id){
        $this->name = 'Worker_'.$id;
        $this->id = $id;
        $this->attempts = 0;
    }
     
    public function getName(){
        return $this->name;
    }
     
    public function addAttempt(){
        $this->attempts ++;
    }  
    public function run(){
        printf("running Worker: %s (%lu)\n", $this->getName(), $this->getThreadId());
    }
    public function setData($str){         
        $this->_data[] = $str;
    }
    public function getData(){
        return join('',$this->_data);       
    }  
    public function getAttempts(){
        return $this->attempts;
    }
     
}
 
class CPool {
    public $workers = [];
    public $status  = [];
    protected $limit = 10;
    protected $length = 0;
    public $running = false;
     
     
    public function __construct($size){
        $this->limit = $size;       
    }
    protected function getWorker($no){
        $worker = new MyWorker($no);
        $worker->setData("worker inited \n");       
        $worker->start(PTHREADS_INHERIT_NONE);
        return $worker;
    }
    public function add(Stackable $stackable){
         
        if ( $this->length < $this->limit ){      
            printf("add item to stack\n");
            $id = $stackable->getId();
            $no = $id % 2;
            if ( empty($this->workers[$no]) ){
                $this->workers[$no] = $this->getWorker($no);
            }
            if($this->workers[$no]->stack($stackable)){
                $this->length ++;
                return true;
            }else
                printf("Worker[%d] is full.\n",$no);
                return false;
            }
        }else{
            return false;
        }
    }
    public function shutdown(){
        foreach($this->workers as $worker){
            printf("Worker %s shutdown[%d]\n",$worker->getName() ,$worker->shutdown());
        }      
    }
    public function getAttempts(){
        foreach($this->workers as $worker){
            printf("%s attempts %d times.\n",$worker->getName(),$worker->getAttempts() );
        }
    }
    public function getData(){
        foreach($this->workers as $worker){
            printf("%s 's  data is %s \n",$worker->getName(), $worker->getData() );
        }
    }
}
 
echo "Start...\n";
$start_time = microtime(1);
$pool = new CPool(5);
$works = [];   
for ( $i=0; $i<10; $i++){
    $st = new MyStackable( $i );
    $works[] = $st;//Notice!!!!
    $pool->add( $st );
}
         
 
$pool->shutdown();
 
$pool->getAttempts();
$pool->getData() ;
 
printf("Finished all in %f seconds.\n",microtime(1)-$start_time);

【参考: https://github.com/krakjoe/pthreads/blob/master/examples/Pooling.php#L33】

注意$works[] = $st;这一行, 我在这里吃了大亏,一般人看来这一行明显是多余的,但是如果少了这行立马就core了。

感觉pthreads的代码没有遵循zend标准,运行起来各种各样的怪问题都会冒出来。

从代码的运行结果可以看到:

1. 多个Worker之间是并行的,每个Worker的Stackable却是按照顺序(其实也并非“顺序”,因为不一定会从第一个Stackable开始运行)一个一个地运行的。

2. Worker中似乎不支持数组,上面代码中的getData,setData方法就没有效果,而把里面的数组操作改成字符串操作之后又能返回期待的内容了。

 

===============================

PS:据laruence说这个只能玩,不能应用在生产环境。看来多线程的场景还是用Python吧。

 Laruence:这玩意儿只能作为一种学习研究的东西, 可千万别在实际场景使用. 首先ZTS的PHP本身就要比NZTS的慢. 再者多线程对使用者有较高的要求, 调试也麻烦. 最后, 这东西还不够稳定

PHP