今回は,前回説明したプロセスを扱う上で重要な同期処理について説明する.
各プロセスは,生成すると独立に,実行を開始し,そのままでは系統的な 計算ができない.そのため,各プロセスをうまく制御する必要がある. あるプロセスを一時的に止めたり,再開したりする処理を 同期処理(synchronization) という.同期処理をしない場合は, 非同期処理(asynchronization) という.例えば, 一つのプロセスで,整数型変数の値ををインクリメント(1加える)し, もう一つのプロセスで,変数の値を表示するプログラムを考える. このプログラムの処理をタイムチャートを用いて表すと以下のようになる.
この例では,プロセス ProcA が,変数の値 0, 1, 2, 3, 4 とした所で, コンテキストスイッチが起こり,プロセス ProcB で値 4 を表示し, コンテキストスイッチが起こり,プロセス ProcA で 5, 6, 7 とした所で, コンテキストスイッチが起こり,プロセス ProcB で,同様に値 7 を表示している. このように,「どの値が表示されるか」は,時分割のコンテキストスイッチ次第と なる.この値を,1, 2, 3, 4, 5, 6, 7, のように,ProcB で表示させたい 時に,同期処理を行う.
この処理を Xinu 上で C 言語プログラムとして記述すると以下のようになる.
1: /* main_ex4.c - main_xinu */ 2: 3: #include <conf.h> 4: #include <kernel.h> 5: #include <user.h> 6: #include <proc.h> 7: 8: long n = 0; /* external variables are shared by all processes */ 9: 10: /*---------------------------------------------------------- 11: * main -- user main program 12: *---------------------------------------------------------- 13: */ 14: main_xinu() 15: { 16: int procA(), procB(); 17: resume( create(procA, 200, 20, "prod", 0 ) ); 18: resume( create(procB, 200, 20, "cons", 0 ) ); 19: } 20: 21: procA() 22: { 23: long i; 24: for( i=1; i<=200000000; i++ ) n++; 25: } 26: 27: procB() 28: { 29: long i; 30: for( i=1; i<=200000000; i++ ) printf("n is %d \n", n); 31: } |
8 行目で,2 つのプロセスで共有する変数 n を宣言している. 16, 17 行目で,create 関数,resume 関数を用いて,2 つのプロセス procA, procB を生成している.create 関数への引数は,第一引数,起動するプロセスと なる関数へのポインタ,第二引数,スタックサイズ,第三引数, プロセス名,第四引数,プロセスへ渡す引数の数である.このプログラムを実行 すると以下のようになる.
n is 6978 n is 6978 n is 6978 n is 13763 n is 13763 n is 13763 n is 26873 n is 26873 n is 26873 ... ...
この様に,順番に値が表示されるわけではなく, 適当に(コンテキストスイッチ次第),値が表示される. しかも,表示される値は,起動される度に異なる.実際の処理では, 変数のインクリメントの処理速度より,表示するための処理速度が相当遅いこと がわかる. 例えば,このプログラムで,値を 1, 2, 3, 4, … のように順番に表示させようと するのが同期処理である.
同期処理は,並列プログラミング,並列処理の要で,同期処理をうまく利用し, 効率よい並列プログラミングをする必要がある. 同期処理だけで一つの学問体系となる位,非常に幅広く,多くの方法が提案 されているが,本講義では,セマフォ(semaphore),モニタ(monitor) を 説明する.さらに,応用として,有限バッファ問題(bounded buffer problem) を紹介する.
多くの教科書では,並列プログラミング言語を用いて,同期処理を 説明しているが,本講義では,Xinu を中心に説明するので,C 言語を用いて 説明する.
最近では,代表的な同期処理がセマフォ(semaphore)である. オブジェクト指向言語 java の登場により,次に説明するモニタ(monitor) も多く用いられる.元々,セマフォは,単線の鉄道が衝突せずにするために 考えだされた信号機である.
線路 A を通る時には,「線路 A を通ってよい」という合図を受け取るまで待機し, 受け取った後走行できる.同様に,線路 B を通るには,「線路 B を 通ってよい」という合図を受け取るまで待機し,受け取った後, 走行できる.
この考えのキーは,合図を線路を通ることができる汽車の分(この場合は, 単線であるので1つ) しか与えないことである.もし,それ以上の合図を 与えた場合には,衝突してしまう.
プロセスの場合を考えてみよう.
procA() { long i; for( i=1; i<=200000000; i++ ) { n++; } } procB() { long i; for( i=1; i<=200000000; i++ ) { printf("n is %d \n", n); } } |
procA 関数と,procB 関数を各々プロセスとして起動し, n++ と,printf("n is %d \n",n); が交互に 処理されるようにすればよい.以下のプログラムを考えてみよう.
procA() { long i; for( i=1; i<=200000000; i++ ) { PA ; n++; VB ; } } procB() { long i; for( i=1; i<=200000000; i++ ) { PB ; printf("n is %d \n", n); VA ; } } |
ここで,P 命令 は, 合図が来るまで待機し,合図が来たら処理を再開する命令である. 添字をもつPA は,プロセス procA のための待機命令で あることを示す.V 命令は,逆に,対応する P 命令の待機状態を解除し, 処理を再開させることを意味する.上のように P 命令,V 命令を同期させたい 部分(際どい部分(critical section)または, 際どい区域(critical region)という) にはさむことにより, 望む同期処理が可能となる.これらの命令を総称して P, V 命令 という. この考え方は,E.W.Dikstra によって考案された.セマフォを用いて 待機状態にする場合は,他のプロセスへ CPU の実行権を渡すことができる ので,効率のよいプログラミングが可能となる.
セマフォは,P, V 命令への引数として渡される.つまり,上のような 単線の鉄道では,2 つの信号機(semaphore) が必要で,P,V 命令で指定する 必要がある.大きく分けて,2 種類ある.
2 値セマフォは,セマフォの状態として 2 値(「未実行」,「実行」など, 0, 1 で表現する場合も多い) をもつものである. 計数セマフォは,より一般化させ,N 種類の値を持つセマフォである.
以下では,P,V 命令の P 命令に wait,V 命令に signal を用いる. 以下のプログラムとなる.
procA() { int i; for( i=1; i<=20000; i++ ) { wait(semA ); n++; signal(semB ); } } procB() { int i; for( i=1; i<=20000; i++ ) { wait(semB ); printf("n is %d \n", n); signal(semA ); } } |
このように,P, V 命令の場合と全く同様に記述することができる. セマフォは,2 つある.
各々,二つの値を持っていることがわかる.wait 命令は,セマフォの値を 確認し,「実行」であれば,他のプロセスが wait 以下を実行しているので, 待機する.この待機状態は,いずれ,対応する signal 命令により解除され 再開される.そうでなければ,wait 以下をすぐ実行することが可能であるため, セマフォの値を「実行」にする. signal 命令は,対応する wait 命令の位置で処理が「待機」状態にあれば, 処理を再開し,セマフォの値を「実行」にする.そうでなければ, 対応する wait 以下を実行する処理がないので,セマフォの値を「未実行」にする. このように 2 値セマフォは,「実行」,「未実行」の 2 つの状態を持つ. wait 命令,signal 命令は以下のような処理となる.
signal(セマフォ識別子) { if( セマフォ識別子で示すセマフォで「待機」しているプロセスがある ) { 待機している処理を再開する(「実行」状態にする). セマフォの値を「実行」にする. } else セマフォの値を「未実行」にする. return(OK); } wait(セマフォ識別子) { if( セマフォ識別子で示すセマフォの値が「実行」 ) 「待機」状態にする. else セマフォの値を「実行」にする. return(OK); } |
各プロセス procA,procB のためのセマフォ 2 つの初期値を各々 semA は「未実行」,semB は「実行」とする. procA では,セマフォ(semA) の値が「未実行」であるため待機せずに, セマフォの値を「実行」にし,wait 以下の処理を実行する. n++ を処理した後,signal(semB) を実行する. 2 度目は,semAの値が「実行」であるため,wait 命令で待機する. プロセス procB では,セマフォの (初期)値(semB) が「実行」でかつ,wait 命令で「待機」状態であるので, 処理が再開され,semB の値を「実行」にする. そして,printf("n is %d \n", n); で変数 n の値を表示し, signal(semA) を実行する.再び,プロセス procA では, 待機していた処理が再開される.この処理の様子をタイムチャートで表すと以下 のようになる.
2 値セマフォを用いて同期処理をする場合は,このように際どい部分を wait,signal で挟めばよい.しかし,この例では,全体として 2 つのプロセス しか考慮しておらず,3 つ以上のプロセスが際どい部分の処理を しようとする場合は,少し改良が必要である.
少し,話題を変えて,一つの処理を同時に,一つのプロセスにしか処理させたく ない場合を考えよう.マルチタスキング OS では,以下のように,複数のプロセス が一つの処理を同時にアクセスすることができる.
際どい部分を wait と signal で挟むことにより,同時に一つのプロセスしか 処理ができないようにすることができる.この同期処理を 相互排除(mutual exclution) という.以下のように記述する.
func() { wait(semA ); 際どい部分; signal(semA ); } |
相互排除は,セマフォ一つ生成すればよい.wait 命令で,「待機」状態と なるプロセスが,複数となる可能性があるので,「待機」状態から処理を再開させる signal 命令に少し改良が必要である.
signal(セマフォ識別子) { if( セマフォ識別子で示すセマフォで「待機」しているプロセスがある ) { 待機しているプロセスの中から一つ選び処理を再開する(「実行」状態にする). セマフォの値を「実行」にする. } else セマフォの値を「未実行」にする. return(OK); } |
この場合も,セマフォの値は,2値でよいので,こちらの定義が 2 値セマフォのための signal 命令の正しい記述である. ここで注意が必要なのは,
点である.実際の実現では,順番に再開する場合も多いが, 「順番」を前提に,プログラミングしてはならない.
上の同期処理では,際どい部分で「待機状態」となるプロセスが複数あることが わかった.さらに,セマフォの値は 2値 であった.しかし, セマフォの値を N 種類にすることにより,待機状態にあるプロセスの 数を数えたり,同時に M プロセスの実行を許したりすることが可能となる. セマフォ値を2 値に限定するのではなく,N 種類にするセマフォ を計数セマフォ(counting semaphore) という. 計数セマフォは,有限バッファ問題(bounded buffer problem) を 使って説明するとわかりやすいので,最初に有限バッファ問題を紹介する.
有限バッファ問題は,以下のような限られた(有限の) メモリ領域(配列と考えればよい) にどのように,値を出し入れすれば, うまく同期処理を記述することができるかを問う問題である.
ここで,問題なのは,以下の 2 点である.
「情報を入れる側」は,バッファの空個数を値として持つ計数セマフォ, 「情報を出す側」は,バッファに入っている個数を値として持つ計数セマフォを 生成する.こうすることにより,計数セマフォの値が負の数になった場合に, 処理が待たされるようになる.この仕組みを wait 命令にすればよい.よって, 計数セマフォの signal,wait 命令は以下のように定義することができる.
signal(セマフォ識別子) { if( セマフォ識別子で示すセマフォの値が負の数 ) { セマフォの値をインクリメント. 待機しているプロセスの中から一つ選び処理を再開する(「実行」状態にする). } else セマフォの値をインクリメント. return(OK); } wait(セマフォ識別子) { セマフォ識別子で示すセマフォの値をデクリメント. if( セマフォの値が負の数 ) 「待機」状態にする. return(OK); } |
計数セマフォでは,通常, セマフォの値が正の数の場合は,際どい部分を実行している プロセスの数を表し,セマフォの数が負の数の場合は,際どい部分の 最初で待機しているプロセスの数を表している.
wait 関数は,セマフォの値をディクリメントしてから,さらに, 負の数になったら「待機」状態にしている.例えば,0 の場合は (ディクリメントし,負の数になるので),「待機」状態となり, 1 の場合は,「待機状態」とはならない. セマフォの値は,際どい部分を実行することができるプロセスの 数を表していることがわかる.
計数セマフォは,2 値セマフォを包含していることがわかる.そのため, 実際の実現では,計数セマフォのみが実現されている.
Java により,上の様子を実行するプログラムを作成した.「スクロールバー」 を移動させることにより,入力,出力の処理速度を変化させることができる.
最近では,Java の普及により,同期処理としてモニタ(monitor) を用いる場合が多い.モニタは,上で説明した 2 値セマフォである. もともと,Hore により,Concurrent Pascal という言語を設計し,開発した 時に提案された同期手法である.モニタのガード(guard)(wait 命令に相当) で,際どい部分の処理が実行中の場合,待機させられ,際どい部分の処理が終了後, 処理が再開される.signal 命令に相当する命令は,notify 命令という. オブジェクト指向言語 Java には,並行プログラミングが可能であるため, 言語仕様としてモニタが組み込まれている.
2 値セマフォと同様に,待機させられた処理(プロセス) を「適当に」選び, 再開させるので,注意が必要である(必ずしも,待機させられた「古い」順では ない).
では,Xinu 上で同期処理をするプログラムを書いてみよう.まず, 2 つのプロセスで,共有変数 n の値をインクリメントするプロセス ProcA, n の値を表示するプロセス ProcB を書いたプログラムは以下のようであった.
/* main_ex4.c - main_xinu */ #include <conf.h> #include <kernel.h> #include <user.h> #include <proc.h> long n = 0; /* external variables are shared by all processes */ /*---------------------------------------------------------- * main -- user main program *---------------------------------------------------------- */ main_xinu() { int procA(), procB(); resume( create(procA, 200, 20, "prod", 0 ) ); resume( create(procB, 200, 20, "cons", 0 ) ); } procA() { long i; for( i=1; i<=200000000; i++ ) n++; } procB() { long i; for( i=1; i<=200000000; i++ ) printf("n is %d \n", n); } |
Xinu では,P,V 命令に相当する関数は,用意されていない.計数セマフォ を扱う,wait 関数,signal 関数が用意されているのでこれらの関数 を使えばよい.計数セマフォを生成する関数は,screate である. 共有変数 n の値を順に表示するには,以下のようにすればよい.
/* main_ex5.c - main_xinu */ #include <conf.h> #include <kernel.h> #include <user.h> #include <proc.h> long n = 0; /* external variables are shared by all processes */ /*-------------------------------------------------------- * main -- user main program *-------------------------------------------------------- */ main_xinu() { int procA(), procB(); int consumed, produced; produced = screate(1); consumed = screate(0); resume(create(procA, 200, 20, "prod", 2, produced, consumed)); resume(create(procB, 200, 20, "cons", 2, produced, consumed)); } procA(int produced, int consumed) { int i; for( i=1; i<=20000; i++ ) { wait(produced); n++; signal(consumed); } } procB(int produced, int consumed) { int i; for( i=1; i<=20000; i++ ) { wait(consumed); printf("n is %d \n", n); signal(produced); } } |
produced と consumed の 2 つのセマフォを生成している.produced は, プロセス ProcA のためのセマフォ,consumed は,プロセス ProcB のための セマフォである.関数 screate の引数は,計数セマフォの初期値を示す. procB のためのセマフォ consumed の初期値が 1 のため,procB の本体 printf("n is %d \n", n); を一度実行し,procA の本体 n++; を実行する....のように実行が進む.
相互排除は,際どい部分を wait,signal で挟めばよい.
procA(int mutex) { wait(mutex); 際どい部分; signal(mutex); } |
ただし,この計数セマフォの初期値は,1 とする.
int mutex; mutex = screate(1); resume(create(procA, 200, 20, "prod", 1, mutex));
Xinu の同期処理は,計数セマフォを用いる.計数セマフォでは,セマフォの 値が負の数の場合,|負の数|個のプロセスが待機状態であるので, 処理を再開する場合,古い「順番に」処理を再開する.そのため,レディリストと 同様の,待ち行列(待機リスト)が必要となる. プロセスの状態に,「待機」状態を追加する.
計数セマフォの実現では,2 章プロセス管理で説明した,キューを用いて 「待機リスト」を実現している.
Linux は,スレッドを用いたプログラミングライブラリを標準で備えている. そのため スレッドプログラミングを手軽に行うことができる.
pthread ライブラリといい,IEEE の規格に基づいたプログラミングをすること ができる.各自,自分の Linux を使って実行してみよう.
スレッドを生成するためには,pthread_create 関数を使う.
/* test.c */ # define _REENTRANT # include <stdio.h> # include <pthread.h> unsigned int n; int procA(); int procB(); int main() { pthread_t pA, pB; pthread_create(&pA,NULL,(void*)procA,NULL); pthread_create(&pB,NULL,(void*)procB,NULL); pthread_join(pA,NULL); pthread_join(pB,NULL); pthread_exit((void *)0); return 0; } int procA() { int i; for(i=0;i<1000000000;i++) n++; } int procB() { int i; for(i=0;i<1000000000;i++) printf("n=%d\n",n); } /* end of file */ |
pthread ライブラリによりスレッドを識別する型 pthread_t が新たに宣言 されている.pthread_join 関数は,2 つのスレッドが終了しないうちに, main 関数が終了しないように「待ち合わせる」機能を持つ.
このプログラムを実行するためには,プログラムファイルを test.c とすると以下の ようにタイプする.
prompt% gcc test.c -lpthread prompt% ./a.out i=116734 i=116734 i=116734 i=116734 i=116734 . . . i=387367 i=387367 i=387367 i=387367 i=387367 i=387367 . . .
最近の PC は,非常に速いので,最初のコンテキストスイッチが起こるまでに, 既に,10 万回以上のカウントが処理されていることがわかる.
ライブラリとして pthread を指定する(l オプション).P,V 命令は,各々 pthread_mutex_lock 関数,pthread_mutex_unlock 関数がある.以下のようになる.
/* test1.c */ # define _REENTRANT # include <stdio.h> # include <pthread.h> unsigned int n; int procA(); int procB(); int main() { pthread_t pA, pB; pthread_mutex_t mutex[2]; pthread_mutex_t mutex_procA, mutex_procB; pthread_mutex_init(&mutex_procA,NULL); pthread_mutex_init(&mutex_procB,NULL); mutex[0] = mutex_procA; mutex[1] = mutex_procB; pthread_create(&pA,NULL,(void*)procA,(void*)mutex); pthread_create(&pB,NULL,(void*)procB,(void*)mutex); pthread_join(pA,NULL); pthread_join(pB,NULL); pthread_exit((void *)0); return 0; } int procA(pthread_mutex_t mutex[]) { int i; for(i=0;i<=10000;i++) { pthread_mutex_lock(&mutex[0]); n++; pthread_mutex_unlock(&mutex[1]); } } int procB(pthread_mutex_t mutex[]) { int i; for(i=0;i<=10000;i++) { pthread_mutex_lock(&mutex[1]); printf("n=%d\n",n); pthread_mutex_unlock(&mutex[0]); } } /* end of file */ |
2 つの pthread_mutex_t 型の値を生成している.この値は, pthread_mutex_init 関数で初期化しなければならない.この関数は, 元々,pthread_mutex_t 型の値を 1 つ生成して, 「相互排除」を行うための関数である.
最後に,計数セマフォを実行する.Linux には,sem_wait 関数,sem_post 関数 を用いて,計数セマフォを実行することができる. sem_wait 関数や,sem_post 関数が用意されていない場合は, 上の pthread_mutex_lock 関数や,pthread_mutex_unlock 関数を使って, 代用することができる. これらのプログラムは,多少難しいかもしれないが,じっくり考えてみよう.
/* test2.c */ # define _REENTRANT # include <stdio.h> # include <unistd.h> # include <pthread.h> # include <semaphore.h> static unsigned int i; void *producer(sem_t arg[]) { for(i=0;i<1000000000;) { sem_wait(&arg[0]); i++; sem_post(&arg[1]); } } void *consumer(sem_t arg[]) { int j; for(j=0;j<1000000000;j++) { sem_wait(&arg[1]); printf("i=%d\n",i); sem_post(&arg[0]); } } int main() { pthread_t prod, cons; sem_t sem0, sem1; sem_t buf[2]; sem_init(&sem0,(int)0,(unsigned int)1); // producer sem_init(&sem1,(int)0,(unsigned int)0); // consumer buf[0] = sem0; buf[1] = sem1; pthread_create(&prod,NULL,(void*)producer,(void*)buf); pthread_create(&cons,NULL,(void*)consumer,(void*)buf); pthread_join(prod,NULL); pthread_join(cons,NULL); pthread_exit((void *)0); } /* end of file */ |
/* test3.c */ # define _REENTRANT # include <stdio.h> # include <pthread.h> unsigned int n; # define OK 1 # define ERROR 0 /* semaphore routine starts */ typedef struct sem_table { pthread_mutex_t mutex; pthread_cond_t cond; int value; } sem_table; typedef sem_table* semaphore; semaphore screate(int value) { semaphore st = (semaphore)malloc(sizeof(sem_table)); st->value=value; pthread_mutex_init(&(st->mutex),NULL); pthread_cond_init(&(st->cond),NULL); return(st); } int sdelete(semaphore sem) { pthread_mutex_destroy(&(sem->mutex)); pthread_cond_destroy(&(sem->cond)); free(sem); return(OK); } int wait(semaphore sem) { pthread_mutex_lock(&(sem->mutex)); if( --(sem->value) < 0 ) { pthread_cond_wait(&(sem->cond),&(sem->mutex)); } pthread_mutex_unlock(&(sem->mutex)); return(OK); } int signal(semaphore sem) { pthread_mutex_lock(&(sem->mutex)); if( (sem->value++) < 0 ) { pthread_cond_signal(&(sem->cond)); } pthread_mutex_unlock(&(sem->mutex)); return(OK); } /* end of semaphore routine */ int procA(); int procB(); int main() { pthread_t pA, pB; semaphore sem[2]; sem[0] = screate(1); sem[1] = screate(0); pthread_create(&pA,NULL,(void*)procA,(void*)sem); pthread_create(&pB,NULL,(void*)procB,(void*)sem); pthread_join(pA,NULL); pthread_join(pB,NULL); pthread_exit((void *)0); return 0; } int procA(semaphore sem[]) { int i; for(i=0;i<=10000;i++) { wait(sem[0]); n++; signal(sem[1]); } } int procB(semaphore sem[]) { int i; for(i=0;i<=10000;i++) { wait(sem[1]); printf("n=%d\n",n); signal(sem[0]); } } /* end of file */ |
新しい関数などの説明は,pthread ライブラリ関数を参照のこと.
同期処理は,並列プログラミングに不可欠な概念である.さらに,進んだ同期処理 としては,いくつかの問題が提案されている.
同期処理の待ち合わせについて、もう少し考えてみよう。 P、V 命令を使った同期処理は、以下のように、(広域)変数を一つ用意する だけで実現することができる。
/* test4.c */ # define _REENTRANT # include <stdio.h> # include <unistd.h> # include <pthread.h> static unsigned int i; static int global_prod=0; static int global_cons=1; void *producer() { for(i=0;i<1000000000;) { while(global_prod); global_prod=1; i++; global_cons=0; } } void *consumer() { int j; for(j=0;j<1000000000;j++) { while(global_cons); global_cons=1; printf("i=%d\n",i); global_prod=0; } } int main() { pthread_t prod, cons; pthread_create(&prod,NULL,(void*)producer,NULL); pthread_create(&cons,NULL,(void*)consumer,NULL); pthread_join(prod,NULL); pthread_join(cons,NULL); pthread_exit((void *)0); } |
producer 関数,consumer 関数のために, 各々(広域)変数 global_prod,global_cons を用意する. これらが,セマフォに相当し,待ち合わせに使う. 各待ち合わせは,変数の値が 0 (FALSE) になるまで待つ. このプログラムを実行すると以下のようになる.
prompt% gcc test4.c -lpthread prompt% ./a.out i=1 i=2 i=3 i=4 i=5 . . . i=1022 i=1023 i=1024 i=1025 i=1026 i=1027 . . .
このように,セマフォを使った場合と同様に,1 から順番に変数の 値が更新される.
しかし,セマフォを使った場合と違い,待ち合わせの部分,
および,
で,CPU の計算(計算資源) が消費される.そのため,非常に効率が悪い. この待ち合わせを,ビジーウエイト(busy wait) または, スピンロックという. 通常の,並列/並行プログラムでこの方式を使う場合は注意が必要である. 一方,セマフォなどの同期機構を使う場合は, OS の機能より,「待機状態」に遷移し,CPU の計算資源を消費しない. スピンロックは,デバイス(装置) の待ち合わせには,よく用いられる.