Table of Contents

เราจะทดลองเขียนโปรแกรมแบบ concurrent บน Erlang

การบ้านจะอยู่ปนกับส่วนโปรแกรมที่ให้ทดลอง ด้านบนนี้จะรวมลิงก์ไปยังการบ้านแต่ละข้อที่อยู่ในเนื้อหาด้านล่าง


การแสดงผลและการสร้างโปรเซส

ก่อนอื่นเราทดลองฟังก์ชันสำหรับแสดงผลลัพธ์ก่อน โดยฟังก์ชัน io:format ลักษณะการใช้งานคล้ายกับฟังก์ชัน printf อาร์กิวเมนต์แรกของฟังก์ชันคือรูปแบบการแสดงผล โดยรูปแบบครอบจักรวาลที่เราจะใช้คือ ~w ซึ่งแสดงข้อมูลทุกรูปแบบ อาร์กิวเมนต์ที่สองเป็นลิสต์ของข้อมูลที่ต้องการพิมพ์

ทดลองใช้เช่น

io:format("X = ~w\n",[10]).
io:format("I got: ~w\n",[{this,is,a,tuple}]).

เราจะเขียนโมดูล pex.erl (ย่อมาจาก process example, จะใช้ชื่ออื่นก็ได้ อย่าลืมเปลี่ยนในส่วนต่อ ๆ ไปด้วย)

-module(pex).
-export([gp/0]).
 
gp() ->
    receive
	Any ->
	    io:format("I got: ~w\n",[Any])
    end.

ทดลองใช้โดยสั่งดังตัวอย่างด้านล่าง

>>> c(pex).                   # บรรทัดที่ขึ้นด้วย >>> คือบรรทัดที่พิมพ์
{ok,pex}
>>> Pid = spawn(pex,gp,[]).   # สร้างโปรเซสใหม่ให้เรียกฟังก์ชัน pex:gp ทำงาน โดยส่ง argument ว่างไป
<0.51.0>
>>> Pid ! hello.              # ส่ง hello ไป  
I got: hello                  # โปรเซสพิมพ์ออกมา
hello                         # การส่ง (!) คืนค่าเป็น message 
>>> Pid ! hello.              # ส่งซ้ำ
hello                         # นิ่ง 
>>> Pid ! hello.              # ส่งซ้ำ
hello                         # นิ่ง

สังเกตว่าในรอบถัดไปที่เราส่ง hello ไป โปรเซสไม่ได้ทำงานอะไรแล้ว ทั้งนี้เนื่องจากเมื่อรับค่าได้การทำงานก็หมดลง

เราจะให้วนกลับมารับค่าใหม่ได้โดยการเรียกตัวเอง ฟังก์ชัน gp ที่รับค่าได้เรื่อย ๆ แสดงดังด้านล่าง แก้แล้วทดลองคอมไพล์ใหม่ (สั่ง c(pex).)

gp() ->
    receive
	Any ->
	    io:format("I got: ~w\n",[Any]),
	    gp()
    end.

อย่าลืมว่าถ้าจะ spawn ใหม่ ต้องเก็บค่า process id ไว้ในตัวแปรอื่น (ตัวแปร Pid ถูกใช้ไปแล้ว) ถ้าเราสั่ง Pid = spawn… อีก การ matching จะล้มเหลว

โปรเซสบวกเลข

เมื่อเราสั่ง receive โปรเซสจะหยุดรอ และจะทำงานเมื่อมี message ที่ match ได้กับ pattern ส่งมาหามัน เราจะทดลองสร้างโปรเซสง่ายในการบวกเลขและแสดงผล โดยการทำงานคร่าว ๆ จะเป็นดังนี้ โปรเซสดังกล่าวจะรับ message ในรูปแบบ {add, Value} เพื่อเพิ่มค่าผลรวม หรือ message output เพื่อสั่งให้พิมพ์ผลรวม

สังเกตว่าใน Erlang เราไม่มีแนวคิดของตัวแปรที่เก็บค่า “การเพิ่มค่า” ที่เรากล่าวถึงในข้างต้นจะต้องทำผ่านทางกระบวนการอื่น เช่นการเปลี่ยนค่าเมื่อเรียกตัวเอง ดังตัวอย่างด้านล่าง

adder(C) ->
    receive
	{add,Val} ->
	    adder(C+Val);
	output ->
	    io:format("~w\n",[C]),
	    adder(C)
    end.

หมายเหตุ: สังเกตรูปแบบการเขียน receive เราจะมี pattern, ภายใน pattern จะขั้นแต่ละขั้นตอนด้วย , คั่นระหว่าง pattern ด้วย ; และไม่มีการคั่นใน pattern สุดท้าย

จากตัวอย่างข้างต้น เราสามารถคิดได้ว่าฟังก์ชันจะใช้อาร์กิวเมนต์ C ในการเก็บค่าผลรวมปัจจุบัน (สังเกตว่าจะมีการปรับค่าเมื่อเรียก add แต่เมื่อเรียก output จะไม่มีการเปลี่ยนแปลงใด ๆ

ในการสร้างโปรเซสที่ทำงานด้วยฟังก์ชัน adder/1 เราจะต้องใส่ค่าให้ C ด้วย ซึ่งโดยปกติแล้วเรามักเริ่มที่ 0 ดังนั้นเพื่อความสะดวกเราจะสร้างฟังก์ชัน adder/0 ที่เรียกให้เราอัตโนมัติ

adder() -> adder(0).

เมื่อเพิ่มฟังก์ชันทั้งสองลงใน pex.erl แล้ว เราจะสามารถทดลองได้ดังด้านล่าง

>>> c(pex).
{ok,pex}
>>> P10 = spawn(pex,adder,[]).
<0.105.0>
>>> P10 ! output.
0
output
>>> P10 ! {add,10}.
{add,10}
>>> P10 ! {add,1000}.
{add,1000}
>>> P10 ! {add,50}.  
{add,50}
>>> P10 ! output.    
1060
output

เมื่อเกิดปัญหา??

สิ่งที่ยากที่สุดเรื่องหนึ่งในการเขียนโปรแกรมแบบ Concurrent คือการจัดการกับปัญหา (หรือการ debug) ในการเขียนโปรแกรมด้วย Erlang ก็เช่นเดียวกัน แม้ว่าอีกสักพักเราจะพัฒนารายการของฟังก์ชันสำหรับรับส่งที่ช่วยลดปัญหาไปได้หลายเรื่อง เราก็ยังอาจพบปัญหาอื่น ๆ อยู่ดี

ในส่วนนี้จะรวมรายการสำหรับตรวจสอบโปรแกรมเพื่อหาข้อผิดพลาดต่าง ๆ

การบ้าน 1. Stack

เขียนโมดูล stk ที่มีฟังก์ชัน stack ที่เมื่อสั่งให้ spawn process ที่ทำงานด้วยฟังก์ชันดังกล่าวแล้ว โปรเซสจะรองรับ message เหล่านี้

เมื่อเริ่มต้นให้ stack มีสถานะว่าง

ตัวอย่างการทำงานเช่น

1> c(stk).
{ok,stk}
2> P = spawn(stk,stack,[]).
<0.38.0>
3> P ! {push,10}.
{push,10}
4> P ! {push,20}.
{push,20}
5> P ! pop.       
20
pop
6> P ! pop.
10
pop
7> P ! {push,100}.
{push,100}
8> P ! pop.       
100
pop

สังเกตว่าถ้าเราสั่ง pop มากเกินจะได้ error เช่นดังด้านล่าง

9> P ! pop.

=ERROR REPORT==== 24-Sep-2009::14:08:38 ===
Error in process <0.38.0> with exit value: {{badmatch,[]},[{stk,stack,1}]}

การรับส่งระหว่างโปรเซส

ปกติแล้วโปรเซสจะไม่ตอบอะไรออกมาโดยการพิมพ์ (เช่นที่เราทำในกรณีของ adder) แต่จะส่งข้อมูลกลับมายังโปรเซสที่ส่งไปถาม อย่างไรก็ตามโปรเซสเมื่อรับ message มาจะไม่สามารถทราบว่าใครส่งมาให้โดยอัตโนมัติ ดังนั้นผู้ส่ง message จะต้องระบุ process ของตนเองไปด้วย ซึ่งทำได้โดยการเรียกฟังก์ชัน self()

ฟังก์ชัน adder ในโมดูล pex ของเราจะเปลี่ยนไปดังนี้

adder() -> adder(0).
adder(C) ->
    receive
	{Pid,add,Val} ->
	    Pid ! {self(),ok},
	    adder(C+Val);
	{Pid,output} ->
	    Pid ! {self(),C},
	    adder(C)
    end.

สังเกตว่า adder จะรับ message ที่ขึ้นต้นด้วย process id สำหรับส่งกลับ ดังนั้นการจะส่ง message ให้ adder เราจะต้องใส่หมายเลขโปรเซสเข้าไปด้วย เช่น

11> c(pex).
{ok,pex}
12> P1 = spawn(pex,adder,[]).
<0.53.0>
13> P1 ! {self(),add,100}.
{<0.31.0>,add,100}

ผลลัพธ์ที่ได้จะส่งกลับมาที่โปรเซส เราสามารถอ่านได้โดยสั่ง

receive Any -> Any end.

อย่างไรก็ตาม ถ้าเราสั่ง

P1 ! {self(),output}.

แล้วสั่ง

receive Any -> Any end.

โปรเซสเราจะค้าง!

คำถามย่อย: โปรแกรม adder เราทำงานได้ถูกต้องแล้ว แต่ว่าการอ่านค่าของเราผิด เราสั่ง receive ผิดอย่างไร?

วิธีการเอาตัวเองออกจากการค้างให้กด ^G เราจะเข้าสู่ mode พิเศษ โดยดู help ได้โดยกด h ให้กด j เพื่อ list job (น่าจะพบ job เดียว); จากนั้นสั่ง k 1 เพื่อ kill job นั้น; กด s เพื่อสร้าง shell; กด j จะเห็น job 2 เป็น shell ใหม่ที่เราสร้า; และสุดท้ายกด c 2 เพื่อเชื่อมกับ shell ใหม่นั้นกลับไปทำงานอื่น ๆ ต่อ

การ receive แบบมี timeout

เราสามารถเพิ่มส่วน after เพื่อระบุค่าที่จะคืนเมื่อรอรับนานเกินกว่าเวลาที่กำหนด (เป็นมิลลิวินาที) ตัวอย่างการใช้เช่น

  receive
    hello ->
      goodbye
  after 1000 ->
    byebye
  end.

โปรแกรมข้างต้นจะรอรับ message hello เป็นเวลา 1 วินาที ถ้ามีจะคืนค่า goodbye ถ้าไม่เช่นนั้นจะคืนค่า byebye

ดังนั้นเพื่อป้องกันการค้าง (block) เราสามารถสั่ง

receive Any -> Any after 0 -> error end.

ได้

เรามาทดลองต่อกับ adder

2> P1 = spawn(pex,adder,[]).           # สร้าง process ใหม่
<0.63.0>
3> receive Any -> Any after 0 -> error end.  # ทดลองรับดู
error                                        # ไม่มีของ
6> P1 ! {self(),add,100}.                   
{<0.60.0>,add,100}
7> receive Any -> Any after 0 -> error end.  # รับ
{<0.63.0>,ok}                                # ได้ ok
8> P1 ! {self(),add,200}.                    # ลอง add ใหม่
{<0.60.0>,add,200}                          
9> receive Any -> Any after 0 -> error end.  # รับ
{<0.63.0>,ok}                                # ได้ ok
10> P1 ! {self(),output}.                    # เรียก output
{<0.60.0>,output}
11> receive Any -> Any after 0 -> error end. # รับ
error                                        # error
13> P1 ! {self(),add,200}.                   # ส่ง add
{<0.60.0>,add,200}
14> receive Any -> Any after 0 -> error end. # รับ
{<0.63.0>,ok}                                # กลับมารับได้!

แต่ถ้าเราลองส่ง output ไปอีก ก็จะได้ error เช่นเดิมไปเรื่อย ๆ

Hint:

15> Any.
{<0.63.0>,ok}

ฟังก์ชันสำหรับเรียกใช้งาน

การจะส่ง message โดยต้องเรียกและรอรับเองนั้นค่อนข้างยุ่งยาก เรามาเขียนฟังก์ชันเพิ่มเติมให้การเรียก adder ทำได้ง่ายกัน

ฟังก์ชัน call ของเราจะเป็นดังนี้

call(Pid,Msg) ->
    Pid ! {self(),Msg},
    receive
	{Pid,Res} ->
	    Res
    after 1000 ->
	    timeout
    end.

สังเกตว่าเมื่อเรียกเสร็จจะมีการ receive โดยรอรับ message ที่ขึ้นต้นด้วยหมายเลข process ที่เราส่งไปเท่านั้น เราใส่ after ไปด้วย เพื่อจัดการกรณีไม่มี message ส่งกลับมา

ฟังก์ชันดังกล่าวก่อนจะใช้งานกับ adder ได้ต้องปรับฟังก์ชัน adder ให้รูปแบบของ message อยู่ในรูปแบบเดียวกันก่อน (สังเกตว่าตอนนี้ add รับแบบหนึ่งส่วน output รับอีกแบบหนึ่ง)

ฟังก์ชันที่แก้แล้วพร้อมด้วยฟังก์ชัน

adder() -> adder(0).
adder(C) ->
    receive
	{Pid,{add,Val}} ->
	    Pid ! {self(),ok},
	    adder(C+Val);
	{Pid,output} ->
	    Pid ! {self(),C},
	    adder(C)
    end.

ทดลองใช้ได้ดังนี้

18> P2 = spawn(pex,adder,[]).
<0.88.0>
23> pex:call(P2,{add,100}).  
ok
24> pex:call(P2,{add,100}).
ok
25> pex:call(P2,output).   
200
26> pex:call(P2,{add,1000}).
ok
27> pex:call(P2,output).    
1200
28> 

การบ้าน 2. Stack ที่ติดต่อโดยการส่ง message

สังเกตว่าฟังก์ชัน call ด้านบนทำงานไม่ได้ทำงานได้กับ adder เพียงอย่างเดียว ให้แก้โมดูล stk เดิม โดยแก้ฟังก์ชัน stack ให้ทำงานได้กับฟังก์ชัน call ด้านบน (ไม่ต้องเขียนฟังก์ชัน pex:call ใหม่)

ตัวอย่างเช่น

28> c(stk).
{ok,stk}
29> Ps = spawn(stk,stack,[]).
<0.110.0>
30> pex:call(Ps,{push,10}).
ok
31> pex:call(Ps,{push,20}).
ok
32> pex:call(Ps,{push,30}).
ok
33> pex:call(Ps,pop).      
30
34> pex:call(Ps,pop).
20
35> pex:call(Ps,{push,100}).
ok
36> pex:call(Ps,pop).       
100
37> pex:call(Ps,pop).
10

และสังเกตว่าถ้าโปรเซสตายไปก่อน ฟังก์ชัน pex:call เราจะคืนค่าเป็น timeout

38> pex:call(Ps,pop).

=ERROR REPORT==== 24-Sep-2009::15:32:10 ===
Error in process <0.110.0> with exit value: {{badmatch,[]},[{stk,stack,1}]}

timeout

กลุ่มของโปรเซสเพื่อประมวลผลงาน

ในส่วนนี้เราจะทดลองสร้างกลุ่มของโปรเซสเพื่อประมวลผลงานและวัดประสิทธิภาพที่เพิ่มขึ้นในกรณีที่เครื่องมี cpu แบบหลาย core

การบ้าน 3. ฟังก์ชัน is_prime

ให้เขียนฟังก์ชัน is_prime ในโมดูล prime ฟังก์ชันดังกล่าวรับจำนวนเต็มแล้วคือ true หรือ false ขึ้นกับว่าจำนวนเต็มดังกล่าวเป็นจำนวนเฉพาะหรือไม่

ตัวอย่างการใช้งาน

7> prime:is_prime(11).
true
8> prime:is_prime(10).
false
9> prime:is_prime(2). 
true
10> prime:is_prime(1020391).
false
11> prime:is_prime(127).    
true

ฟังก์ชันด้านล่าง เรียกใช้ is_prime กับลิสต์ของจำนวนเต็มตั้งแต่ 1 ถึง 1000000 แล้วแสดงเวลาการทำงาน

run_seq() ->
    statistics(runtime),
    lists:map(fun prime:is_prime/1,lists:seq(1,1000000,1)),
    {_,Time} = statistics(runtime),
    io:format("~p\n",[Time]).

ฟังก์ชัน statistics จะเก็บสถิติต่าง ๆ ของการทำงาน, ฟังก์ชัน lists:map เรียกฟังก์ชันกับสมาชิกทุกตัวของลิสต์

การบ้าน 4. โปรเซสสำหรับตรวจสอบจำนวนเฉพาะ

ให้เขียนฟังก์ชัน check ในโมดูล prime สำหรับสร้างโปรเซสที่รับลิสต์ของจำนวนเต็มแล้วส่งลิสต์ของ boolean ที่ระบุว่าจำนวนเต็มใดเป็นจำนวนเฉพาะกลับไป

โปรเซสที่สร้างจากฟังก์ชัน check ดังกล่าวต้องทำงานได้กับฟังก์ชัน pex:call ด้านบน เช่น

27> Pp = spawn(prime,check,[]).
<0.95.0>
28> pex:call(Pp,[1,2,3,4,5,6,7,8,9,10]).
[false,true,true,false,true,false,true,false,false,false]

การทำงานแบบหลายโปรเซส

ในส่วนนี้เราจะทดลองเขียนโปรแกรมที่ทำงานหลายโปรเซสเพื่อทดลองทำงานบนเครื่องที่ทำงานแบบหลาย core

ส่วนเขียนโปรแกรมในข้อนี้เป็นโบนัส (คิดเป็นข้อที่ 5) ไม่จำเป็นต้องทำ แต่ควรทดลองเป็นอย่างยิ่ง ถ้าไม่ต้องการเขียนในส่วนใดสามารถเข้าไปดูเฉลยได้ทันที

ด้านล่างคือฟังก์ชัน spawn_n ที่สร้างลิสต์ของโปรเซส N โปรเซส จากฟังก์ชันที่ระบุ

spawn_n(0,_,_,_,L) ->
    L;
spawn_n(N,Mod,Fun,Args,L) ->
    Pid = spawn(Mod,Fun,Args),
    spawn_n(N-1,Mod,Fun,Args,[Pid|L]).

spawn_n(N,Mod,Fun,Args) ->
    spawn_n(N,Mod,Fun,Args,[]).

ให้เพิ่มฟังก์ชันดังกล่าวลงในโมดูล pex และทดลองเรียกใช้

34> Pids = pex:spawn_n(20,prime,check,[]).
[<0.139.0>,<0.138.0>,<0.137.0>,<0.136.0>,<0.135.0>,
 <0.134.0>,<0.133.0>,<0.132.0>,<0.131.0>,<0.130.0>,<0.129.0>,
 <0.128.0>,<0.127.0>,<0.126.0>,<0.125.0>,<0.124.0>,<0.123.0>,
 <0.122.0>,<0.121.0>,<0.120.0>]

เราจะทดลองกระจายงานให้หลายโปรเซสทำ เพื่อจะทดลองว่าบนเครื่องที่มีหลาย core ความเร็วจะเพิ่มขึ้นอย่างไร

อย่างไรก็ตามเราต้องเรียก erlang ให้ทำงานในแบบ SMP (symmetric multiprocessing) ก่อน โดยเรียก

erl -smp

แทนที่จะเรียก erl ธรรมดา

ให้เขียนฟังก์ชันต่อไปนี้ (สำหรับคนที่อยากจะทดลองอย่างเดียวสามารถเข้าไปดูโปรแกรมของฟังก์ชันเหล่านี้ได้เลย)

หมายเหตุ ฟังก์ชันมาตรฐานเกี่ยวกับลิสต์สามารถดูได้ที่หน้าเอกสารอ้างอิงของ erlang

จากนั้นเขียนฟังก์ชัน check_list_par/2 ที่รับรายการของจำนวนเต็ม L และจำนวนเต็ม N จากนั้นให้แบ่ง L ออกเป็น N ส่วน, spawn process N process, แล้วส่งรายการย่อยแต่ละอันใหักับโปรเซสต่าง ๆ จนครบ แล้วรอรับผลลัพธ์จากทุกโปรเซส (คำตอบหนึ่ง)

ฟังก์ชันด้านล่างเรียกใช้ check_list_par ให้ทดสอบรายการของตัวเลขจาก 1 ถึง 10000000 พร้อมทั้งแสดงเวลา (เป็นมิลลิวินาที) ให้ทดลองเรียกใช้โดยเปลี่ยนค่า N เป็นค่าต่าง ๆ (ควรทดลองกับเครื่องที่มีหลาย core, ควรทดลองเปลี่ยนค่าให้ N มากกว่าจำนวน core ด้วย)

หมายเหตุ:

  1. ในการทดลอง ถ้าเครื่องเร็วให้ทดลองเพิ่มขอบเขตตัวเลข (ให้มากกว่า 10000000)
  2. บางทีเมื่อเพิ่ม process แล้วแต่ runtime จะยังเท่าเดิม (เพราะคิดรวมทุก process) แต่ wall clock จะลดลง ถ้าเรามีหลาย core
  3. ในการเรียกใช้โปรแกรมที่ให้มาสำหรับทดลอง อย่าลืมใส่ฟังก์ชันลงในโมดูลให้ถูกต้องด้วย กล่าวคือฟังก์ชัน split_list อยู่ในโมดูล pex, ส่วน send_all, receive_all, check_list_par อยู่ในโมดูล prime และอย่าลืม c(pex). และ c(prime). ก่อนทดลอง
run_par(N) ->
    statistics(runtime),
    statistics(wall_clock),
    check_list_par(lists:seq(1,1000000,1),N),
    {_,Time1} = statistics(runtime),
    {_,Time2} = statistics(wall_clock),
    io:format("runtime: ~p, wall clock: ~p\n",[Time1,Time2]).