เราจะทดลองเขียนโปรแกรมแบบ concurrent บน Erlang
การบ้านจะอยู่ปนกับส่วนโปรแกรมที่ให้ทดลอง ด้านบนนี้จะรวมลิงก์ไปยังการบ้านแต่ละข้อที่อยู่ในเนื้อหาด้านล่าง
ก่อนอื่นเราทดลองฟังก์ชันสำหรับแสดงผลลัพธ์ก่อน โดยฟังก์ชัน io:format
ลักษณะการใช้งานคล้ายกับฟังก์ชัน printf
อาร์กิวเมนต์แรกของฟังก์ชันคือรูปแบบการแสดงผล โดยรูปแบบครอบจักรวาลที่เราจะใช้คือ
~w
ซึ่งแสดงข้อมูลทุกรูปแบบ อาร์กิวเมนต์ที่สองเป็นลิสต์ของข้อมูลที่ต้องการพิมพ์
ทดลองใช้เช่น
io:format("X = ~w\n",[10]).
io:format("I got: ~w\n",[{this,is,a,tuple}]).
เราจะเขียนโมดูล pex.erl
(ย่อมาจาก process example, จะใช้ชื่ออื่นก็ได้
อย่าลืมเปลี่ยนในส่วนต่อ ๆ ไปด้วย)
-module(pex). -export([gp/0]). gp() -> receive Any -> io:format("I got: ~w\n",[Any]) end.
ทดลองใช้โดยสั่งดังตัวอย่างด้านล่าง
>>> c(pex). # บรรทัดที่ขึ้นด้วย >>> คือบรรทัดที่พิมพ์ {ok,pex} >>> Pid = spawn(pex,gp,[]). # สร้างโปรเซสใหม่ให้เรียกฟังก์ชัน pex:gp ทำงาน โดยส่ง argument ว่างไป <0.51.0> >>> Pid ! hello. # ส่ง hello ไป I got: hello # โปรเซสพิมพ์ออกมา hello # การส่ง (!) คืนค่าเป็น message >>> Pid ! hello. # ส่งซ้ำ hello # นิ่ง >>> Pid ! hello. # ส่งซ้ำ hello # นิ่ง
สังเกตว่าในรอบถัดไปที่เราส่ง hello
ไป โปรเซสไม่ได้ทำงานอะไรแล้ว ทั้งนี้เนื่องจากเมื่อรับค่าได้การทำงานก็หมดลง
เราจะให้วนกลับมารับค่าใหม่ได้โดยการเรียกตัวเอง ฟังก์ชัน gp ที่รับค่าได้เรื่อย ๆ แสดงดังด้านล่าง แก้แล้วทดลองคอมไพล์ใหม่ (สั่ง c(pex).
)
gp() -> receive Any -> io:format("I got: ~w\n",[Any]), gp() end.
อย่าลืมว่าถ้าจะ spawn ใหม่ ต้องเก็บค่า process id ไว้ในตัวแปรอื่น (ตัวแปร Pid ถูกใช้ไปแล้ว) ถ้าเราสั่ง Pid = spawn…
อีก การ matching จะล้มเหลว
เมื่อเราสั่ง receive โปรเซสจะหยุดรอ และจะทำงานเมื่อมี message ที่ match ได้กับ pattern ส่งมาหามัน เราจะทดลองสร้างโปรเซสง่ายในการบวกเลขและแสดงผล โดยการทำงานคร่าว ๆ จะเป็นดังนี้ โปรเซสดังกล่าวจะรับ message ในรูปแบบ {add, Value}
เพื่อเพิ่มค่าผลรวม หรือ message output
เพื่อสั่งให้พิมพ์ผลรวม
สังเกตว่าใน Erlang เราไม่มีแนวคิดของตัวแปรที่เก็บค่า “การเพิ่มค่า” ที่เรากล่าวถึงในข้างต้นจะต้องทำผ่านทางกระบวนการอื่น เช่นการเปลี่ยนค่าเมื่อเรียกตัวเอง ดังตัวอย่างด้านล่าง
adder(C) -> receive {add,Val} -> adder(C+Val); output -> io:format("~w\n",[C]), adder(C) end.
หมายเหตุ: สังเกตรูปแบบการเขียน receive เราจะมี pattern, ภายใน pattern จะขั้นแต่ละขั้นตอนด้วย ,
คั่นระหว่าง pattern ด้วย ;
และไม่มีการคั่นใน pattern สุดท้าย
จากตัวอย่างข้างต้น เราสามารถคิดได้ว่าฟังก์ชันจะใช้อาร์กิวเมนต์ C
ในการเก็บค่าผลรวมปัจจุบัน (สังเกตว่าจะมีการปรับค่าเมื่อเรียก add แต่เมื่อเรียก output จะไม่มีการเปลี่ยนแปลงใด ๆ
ในการสร้างโปรเซสที่ทำงานด้วยฟังก์ชัน adder/1
เราจะต้องใส่ค่าให้ C ด้วย ซึ่งโดยปกติแล้วเรามักเริ่มที่ 0 ดังนั้นเพื่อความสะดวกเราจะสร้างฟังก์ชัน adder/0
ที่เรียกให้เราอัตโนมัติ
adder() -> adder(0).
เมื่อเพิ่มฟังก์ชันทั้งสองลงใน pex.erl
แล้ว เราจะสามารถทดลองได้ดังด้านล่าง
>>> c(pex). {ok,pex} >>> P10 = spawn(pex,adder,[]). <0.105.0> >>> P10 ! output. 0 output >>> P10 ! {add,10}. {add,10} >>> P10 ! {add,1000}. {add,1000} >>> P10 ! {add,50}. {add,50} >>> P10 ! output. 1060 output
สิ่งที่ยากที่สุดเรื่องหนึ่งในการเขียนโปรแกรมแบบ Concurrent คือการจัดการกับปัญหา (หรือการ debug) ในการเขียนโปรแกรมด้วย Erlang ก็เช่นเดียวกัน แม้ว่าอีกสักพักเราจะพัฒนารายการของฟังก์ชันสำหรับรับส่งที่ช่วยลดปัญหาไปได้หลายเรื่อง เราก็ยังอาจพบปัญหาอื่น ๆ อยู่ดี
ในส่วนนี้จะรวมรายการสำหรับตรวจสอบโปรแกรมเพื่อหาข้อผิดพลาดต่าง ๆ
hello
, welcome
เป็นต้นX = 10.
ไม่ใช่การกำหนดค่า 10
ให้กับตัวแปร X
แต่เป็น พยายาม ผูกค่า 10 เข้ากับ X
และจะผูกได้ถ้า X
ไม่ได้มีค่ามาก่อน หรือมีค่าเป็น 10 อยู่แล้วเท่านั้น
เขียนโมดูล stk
ที่มีฟังก์ชัน stack
ที่เมื่อสั่งให้ spawn process ที่ทำงานด้วยฟังก์ชันดังกล่าวแล้ว โปรเซสจะรองรับ message เหล่านี้
{push,Val}
ใส่ Val
ลงบนตอนบนของ stackpop
แสดงค่าบนสุดของ stack และลบค่านั้นออกจาก stackเมื่อเริ่มต้นให้ stack มีสถานะว่าง
ตัวอย่างการทำงานเช่น
1> c(stk). {ok,stk} 2> P = spawn(stk,stack,[]). <0.38.0> 3> P ! {push,10}. {push,10} 4> P ! {push,20}. {push,20} 5> P ! pop. 20 pop 6> P ! pop. 10 pop 7> P ! {push,100}. {push,100} 8> P ! pop. 100 pop
สังเกตว่าถ้าเราสั่ง pop
มากเกินจะได้ error เช่นดังด้านล่าง
9> P ! pop. =ERROR REPORT==== 24-Sep-2009::14:08:38 === Error in process <0.38.0> with exit value: {{badmatch,[]},[{stk,stack,1}]}
ปกติแล้วโปรเซสจะไม่ตอบอะไรออกมาโดยการพิมพ์ (เช่นที่เราทำในกรณีของ adder) แต่จะส่งข้อมูลกลับมายังโปรเซสที่ส่งไปถาม อย่างไรก็ตามโปรเซสเมื่อรับ message มาจะไม่สามารถทราบว่าใครส่งมาให้โดยอัตโนมัติ ดังนั้นผู้ส่ง message จะต้องระบุ process ของตนเองไปด้วย ซึ่งทำได้โดยการเรียกฟังก์ชัน self()
ฟังก์ชัน adder
ในโมดูล pex
ของเราจะเปลี่ยนไปดังนี้
adder() -> adder(0). adder(C) -> receive {Pid,add,Val} -> Pid ! {self(),ok}, adder(C+Val); {Pid,output} -> Pid ! {self(),C}, adder(C) end.
สังเกตว่า adder
จะรับ message ที่ขึ้นต้นด้วย process id สำหรับส่งกลับ ดังนั้นการจะส่ง message ให้ adder เราจะต้องใส่หมายเลขโปรเซสเข้าไปด้วย เช่น
11> c(pex). {ok,pex} 12> P1 = spawn(pex,adder,[]). <0.53.0> 13> P1 ! {self(),add,100}. {<0.31.0>,add,100}
ผลลัพธ์ที่ได้จะส่งกลับมาที่โปรเซส เราสามารถอ่านได้โดยสั่ง
receive Any -> Any end.
อย่างไรก็ตาม ถ้าเราสั่ง
P1 ! {self(),output}.
แล้วสั่ง
receive Any -> Any end.
โปรเซสเราจะค้าง!
คำถามย่อย: โปรแกรม adder เราทำงานได้ถูกต้องแล้ว แต่ว่าการอ่านค่าของเราผิด เราสั่ง receive ผิดอย่างไร?
วิธีการเอาตัวเองออกจากการค้างให้กด ^G
เราจะเข้าสู่ mode พิเศษ โดยดู help ได้โดยกด h
ให้กด j
เพื่อ list job (น่าจะพบ job เดียว); จากนั้นสั่ง k 1
เพื่อ kill job นั้น; กด s
เพื่อสร้าง shell; กด j
จะเห็น job 2 เป็น shell ใหม่ที่เราสร้า; และสุดท้ายกด c 2
เพื่อเชื่อมกับ shell ใหม่นั้นกลับไปทำงานอื่น ๆ ต่อ
เราสามารถเพิ่มส่วน after
เพื่อระบุค่าที่จะคืนเมื่อรอรับนานเกินกว่าเวลาที่กำหนด (เป็นมิลลิวินาที) ตัวอย่างการใช้เช่น
receive hello -> goodbye after 1000 -> byebye end.
โปรแกรมข้างต้นจะรอรับ message hello
เป็นเวลา 1 วินาที ถ้ามีจะคืนค่า goodbye
ถ้าไม่เช่นนั้นจะคืนค่า byebye
ดังนั้นเพื่อป้องกันการค้าง (block) เราสามารถสั่ง
receive Any -> Any after 0 -> error end.
ได้
เรามาทดลองต่อกับ adder
2> P1 = spawn(pex,adder,[]). # สร้าง process ใหม่ <0.63.0> 3> receive Any -> Any after 0 -> error end. # ทดลองรับดู error # ไม่มีของ 6> P1 ! {self(),add,100}. {<0.60.0>,add,100} 7> receive Any -> Any after 0 -> error end. # รับ {<0.63.0>,ok} # ได้ ok 8> P1 ! {self(),add,200}. # ลอง add ใหม่ {<0.60.0>,add,200} 9> receive Any -> Any after 0 -> error end. # รับ {<0.63.0>,ok} # ได้ ok 10> P1 ! {self(),output}. # เรียก output {<0.60.0>,output} 11> receive Any -> Any after 0 -> error end. # รับ error # error 13> P1 ! {self(),add,200}. # ส่ง add {<0.60.0>,add,200} 14> receive Any -> Any after 0 -> error end. # รับ {<0.63.0>,ok} # กลับมารับได้!
แต่ถ้าเราลองส่ง output
ไปอีก ก็จะได้ error
เช่นเดิมไปเรื่อย ๆ
Hint:
15> Any. {<0.63.0>,ok}
การจะส่ง message โดยต้องเรียกและรอรับเองนั้นค่อนข้างยุ่งยาก เรามาเขียนฟังก์ชันเพิ่มเติมให้การเรียก adder ทำได้ง่ายกัน
ฟังก์ชัน call ของเราจะเป็นดังนี้
call(Pid,Msg) -> Pid ! {self(),Msg}, receive {Pid,Res} -> Res after 1000 -> timeout end.
สังเกตว่าเมื่อเรียกเสร็จจะมีการ receive โดยรอรับ message ที่ขึ้นต้นด้วยหมายเลข process ที่เราส่งไปเท่านั้น เราใส่ after
ไปด้วย เพื่อจัดการกรณีไม่มี message ส่งกลับมา
ฟังก์ชันดังกล่าวก่อนจะใช้งานกับ adder
ได้ต้องปรับฟังก์ชัน adder ให้รูปแบบของ message อยู่ในรูปแบบเดียวกันก่อน (สังเกตว่าตอนนี้ add รับแบบหนึ่งส่วน output รับอีกแบบหนึ่ง)
ฟังก์ชันที่แก้แล้วพร้อมด้วยฟังก์ชัน
adder() -> adder(0). adder(C) -> receive {Pid,{add,Val}} -> Pid ! {self(),ok}, adder(C+Val); {Pid,output} -> Pid ! {self(),C}, adder(C) end.
ทดลองใช้ได้ดังนี้
18> P2 = spawn(pex,adder,[]). <0.88.0> 23> pex:call(P2,{add,100}). ok 24> pex:call(P2,{add,100}). ok 25> pex:call(P2,output). 200 26> pex:call(P2,{add,1000}). ok 27> pex:call(P2,output). 1200 28>
สังเกตว่าฟังก์ชัน call ด้านบนทำงานไม่ได้ทำงานได้กับ adder
เพียงอย่างเดียว ให้แก้โมดูล stk
เดิม โดยแก้ฟังก์ชัน stack
ให้ทำงานได้กับฟังก์ชัน call
ด้านบน (ไม่ต้องเขียนฟังก์ชัน pex:call
ใหม่)
ตัวอย่างเช่น
28> c(stk). {ok,stk} 29> Ps = spawn(stk,stack,[]). <0.110.0> 30> pex:call(Ps,{push,10}). ok 31> pex:call(Ps,{push,20}). ok 32> pex:call(Ps,{push,30}). ok 33> pex:call(Ps,pop). 30 34> pex:call(Ps,pop). 20 35> pex:call(Ps,{push,100}). ok 36> pex:call(Ps,pop). 100 37> pex:call(Ps,pop). 10
และสังเกตว่าถ้าโปรเซสตายไปก่อน ฟังก์ชัน pex:call
เราจะคืนค่าเป็น timeout
38> pex:call(Ps,pop). =ERROR REPORT==== 24-Sep-2009::15:32:10 === Error in process <0.110.0> with exit value: {{badmatch,[]},[{stk,stack,1}]} timeout
ในส่วนนี้เราจะทดลองสร้างกลุ่มของโปรเซสเพื่อประมวลผลงานและวัดประสิทธิภาพที่เพิ่มขึ้นในกรณีที่เครื่องมี cpu แบบหลาย core
ให้เขียนฟังก์ชัน is_prime
ในโมดูล prime
ฟังก์ชันดังกล่าวรับจำนวนเต็มแล้วคือ true
หรือ false
ขึ้นกับว่าจำนวนเต็มดังกล่าวเป็นจำนวนเฉพาะหรือไม่
ตัวอย่างการใช้งาน
7> prime:is_prime(11). true 8> prime:is_prime(10). false 9> prime:is_prime(2). true 10> prime:is_prime(1020391). false 11> prime:is_prime(127). true
ฟังก์ชันด้านล่าง เรียกใช้ is_prime
กับลิสต์ของจำนวนเต็มตั้งแต่ 1 ถึง 1000000 แล้วแสดงเวลาการทำงาน
run_seq() -> statistics(runtime), lists:map(fun prime:is_prime/1,lists:seq(1,1000000,1)), {_,Time} = statistics(runtime), io:format("~p\n",[Time]).
ฟังก์ชัน statistics
จะเก็บสถิติต่าง ๆ ของการทำงาน, ฟังก์ชัน lists:map
เรียกฟังก์ชันกับสมาชิกทุกตัวของลิสต์
ให้เขียนฟังก์ชัน check
ในโมดูล prime สำหรับสร้างโปรเซสที่รับลิสต์ของจำนวนเต็มแล้วส่งลิสต์ของ boolean ที่ระบุว่าจำนวนเต็มใดเป็นจำนวนเฉพาะกลับไป
โปรเซสที่สร้างจากฟังก์ชัน check
ดังกล่าวต้องทำงานได้กับฟังก์ชัน pex:call
ด้านบน เช่น
27> Pp = spawn(prime,check,[]). <0.95.0> 28> pex:call(Pp,[1,2,3,4,5,6,7,8,9,10]). [false,true,true,false,true,false,true,false,false,false]
ในส่วนนี้เราจะทดลองเขียนโปรแกรมที่ทำงานหลายโปรเซสเพื่อทดลองทำงานบนเครื่องที่ทำงานแบบหลาย core
ส่วนเขียนโปรแกรมในข้อนี้เป็นโบนัส (คิดเป็นข้อที่ 5) ไม่จำเป็นต้องทำ แต่ควรทดลองเป็นอย่างยิ่ง ถ้าไม่ต้องการเขียนในส่วนใดสามารถเข้าไปดูเฉลยได้ทันที
ด้านล่างคือฟังก์ชัน spawn_n
ที่สร้างลิสต์ของโปรเซส N โปรเซส จากฟังก์ชันที่ระบุ
spawn_n(0,_,_,_,L) -> L; spawn_n(N,Mod,Fun,Args,L) -> Pid = spawn(Mod,Fun,Args), spawn_n(N-1,Mod,Fun,Args,[Pid|L]). spawn_n(N,Mod,Fun,Args) -> spawn_n(N,Mod,Fun,Args,[]).
ให้เพิ่มฟังก์ชันดังกล่าวลงในโมดูล pex
และทดลองเรียกใช้
34> Pids = pex:spawn_n(20,prime,check,[]). [<0.139.0>,<0.138.0>,<0.137.0>,<0.136.0>,<0.135.0>, <0.134.0>,<0.133.0>,<0.132.0>,<0.131.0>,<0.130.0>,<0.129.0>, <0.128.0>,<0.127.0>,<0.126.0>,<0.125.0>,<0.124.0>,<0.123.0>, <0.122.0>,<0.121.0>,<0.120.0>]
เราจะทดลองกระจายงานให้หลายโปรเซสทำ เพื่อจะทดลองว่าบนเครื่องที่มีหลาย core ความเร็วจะเพิ่มขึ้นอย่างไร
อย่างไรก็ตามเราต้องเรียก erlang
ให้ทำงานในแบบ SMP (symmetric multiprocessing) ก่อน โดยเรียก
erl -smp
แทนที่จะเรียก erl
ธรรมดา
ให้เขียนฟังก์ชันต่อไปนี้ (สำหรับคนที่อยากจะทดลองอย่างเดียวสามารถเข้าไปดูโปรแกรมของฟังก์ชันเหล่านี้ได้เลย)
send_all
ที่รับรายการของ process id และรายการของ message จากนั้นส่ง message แต่ละอันให้กับ process ที่ตรงกัน (เช่น ถ้าสั่ง send_all([P1,P2],[M1,M2])
message M1 จะถูกส่งให้ P1 และ M2 ถูกส่งให้ P2) (คำตอบหนึ่ง)receive_all
ที่รับลิสต์ของ process id จากนั้นไล่ receive ผลลัพธ์จากแต่ละ process แล้วคืนผลลัพธ์เป็นลิสต์ของสิ่งที่รับได้ (คำตอบหนึ่ง)หมายเหตุ ฟังก์ชันมาตรฐานเกี่ยวกับลิสต์สามารถดูได้ที่หน้าเอกสารอ้างอิงของ erlang
จากนั้นเขียนฟังก์ชัน check_list_par/2
ที่รับรายการของจำนวนเต็ม L และจำนวนเต็ม N จากนั้นให้แบ่ง L ออกเป็น N ส่วน, spawn process N process, แล้วส่งรายการย่อยแต่ละอันใหักับโปรเซสต่าง ๆ จนครบ แล้วรอรับผลลัพธ์จากทุกโปรเซส (คำตอบหนึ่ง)
ฟังก์ชันด้านล่างเรียกใช้ check_list_par
ให้ทดสอบรายการของตัวเลขจาก 1 ถึง 10000000 พร้อมทั้งแสดงเวลา (เป็นมิลลิวินาที) ให้ทดลองเรียกใช้โดยเปลี่ยนค่า N
เป็นค่าต่าง ๆ (ควรทดลองกับเครื่องที่มีหลาย core, ควรทดลองเปลี่ยนค่าให้ N มากกว่าจำนวน core ด้วย)
หมายเหตุ:
split_list
อยู่ในโมดูล pex
, ส่วน send_all
, receive_all
, check_list_par
อยู่ในโมดูล prime
และอย่าลืม c(pex).
และ c(prime).
ก่อนทดลองrun_par(N) -> statistics(runtime), statistics(wall_clock), check_list_par(lists:seq(1,1000000,1),N), {_,Time1} = statistics(runtime), {_,Time2} = statistics(wall_clock), io:format("runtime: ~p, wall clock: ~p\n",[Time1,Time2]).