เราจะทดลองเขียนโปรแกรมแบบ concurrent บน Erlang การบ้านจะอยู่ปนกับส่วนโปรแกรมที่ให้ทดลอง ด้านบนนี้จะรวมลิงก์ไปยังการบ้านแต่ละข้อที่อยู่ในเนื้อหาด้านล่าง * [[:204435-53:erlang#การบ้าน 1. Stack|การบ้าน 1. Stack]] * [[:204435-53:erlang#การบ้าน 2. Stack ที่ติดต่อโดยการส่ง message|การบ้าน 2. Stack ที่ติดต่อโดยการส่ง message]] * [[:204435-53:erlang#การบ้าน 3. ฟังก์ชัน is_prime|การบ้าน 3. ฟังก์ชัน is_prime]] * [[:204435-53:erlang#การบ้าน 4. โปรเซสสำหรับตรวจสอบจำนวนเฉพาะ|การบ้าน 4. โปรเซสสำหรับตรวจสอบจำนวนเฉพาะ]] * [[:204435-53:erlang#การทำงานแบบหลายโปรเซส|การบ้าน 5. รับและกระจายงาน (โบนัส)]] ---- ====== การแสดงผลและการสร้างโปรเซส ====== ก่อนอื่นเราทดลองฟังก์ชันสำหรับแสดงผลลัพธ์ก่อน โดยฟังก์ชัน ''io:format'' ลักษณะการใช้งานคล้ายกับฟังก์ชัน ''printf'' อาร์กิวเมนต์แรกของฟังก์ชันคือรูปแบบการแสดงผล โดยรูปแบบครอบจักรวาลที่เราจะใช้คือ ''~w'' ซึ่งแสดงข้อมูลทุกรูปแบบ อาร์กิวเมนต์ที่สองเป็นลิสต์ของข้อมูลที่ต้องการพิมพ์ ทดลองใช้เช่น io:format("X = ~w\n",[10]). io:format("I got: ~w\n",[{this,is,a,tuple}]). เราจะเขียนโมดูล ''pex.erl'' (ย่อมาจาก process example, จะใช้ชื่ออื่นก็ได้ อย่าลืมเปลี่ยนในส่วนต่อ ๆ ไปด้วย) -module(pex). -export([gp/0]). gp() -> receive Any -> io:format("I got: ~w\n",[Any]) end. ทดลองใช้โดยสั่งดังตัวอย่างด้านล่าง >>> c(pex). # บรรทัดที่ขึ้นด้วย >>> คือบรรทัดที่พิมพ์ {ok,pex} >>> Pid = spawn(pex,gp,[]). # สร้างโปรเซสใหม่ให้เรียกฟังก์ชัน pex:gp ทำงาน โดยส่ง argument ว่างไป <0.51.0> >>> Pid ! hello. # ส่ง hello ไป I got: hello # โปรเซสพิมพ์ออกมา hello # การส่ง (!) คืนค่าเป็น message >>> Pid ! hello. # ส่งซ้ำ hello # นิ่ง >>> Pid ! hello. # ส่งซ้ำ hello # นิ่ง สังเกตว่าในรอบถัดไปที่เราส่ง ''hello'' ไป โปรเซสไม่ได้ทำงานอะไรแล้ว ทั้งนี้เนื่องจากเมื่อรับค่าได้การทำงานก็หมดลง เราจะให้วนกลับมารับค่าใหม่ได้โดยการเรียกตัวเอง ฟังก์ชัน gp ที่รับค่าได้เรื่อย ๆ แสดงดังด้านล่าง แก้แล้วทดลองคอมไพล์ใหม่ (สั่ง ''c(pex).'') gp() -> receive Any -> io:format("I got: ~w\n",[Any]), gp() end. อย่าลืมว่าถ้าจะ spawn ใหม่ ต้องเก็บค่า process id ไว้ในตัวแปรอื่น (ตัวแปร Pid ถูกใช้ไปแล้ว) ถ้าเราสั่ง ''Pid = spawn...'' อีก การ matching จะล้มเหลว ====== โปรเซสบวกเลข ====== เมื่อเราสั่ง receive โปรเซสจะหยุดรอ และจะทำงานเมื่อมี message ที่ match ได้กับ pattern ส่งมาหามัน เราจะทดลองสร้างโปรเซสง่ายในการบวกเลขและแสดงผล โดยการทำงานคร่าว ๆ จะเป็นดังนี้ โปรเซสดังกล่าวจะรับ message ในรูปแบบ ''{add, Value}'' เพื่อเพิ่มค่าผลรวม หรือ message ''output'' เพื่อสั่งให้พิมพ์ผลรวม สังเกตว่าใน Erlang เราไม่มีแนวคิดของตัวแปรที่เก็บค่า "การเพิ่มค่า" ที่เรากล่าวถึงในข้างต้นจะต้องทำผ่านทางกระบวนการอื่น เช่นการเปลี่ยนค่าเมื่อเรียกตัวเอง ดังตัวอย่างด้านล่าง adder(C) -> receive {add,Val} -> adder(C+Val); output -> io:format("~w\n",[C]), adder(C) end. **หมายเหตุ:** สังเกตรูปแบบการเขียน receive เราจะมี pattern, ภายใน pattern จะขั้นแต่ละขั้นตอนด้วย '','' คั่นระหว่าง pattern ด้วย '';'' และไม่มีการคั่นใน pattern สุดท้าย จากตัวอย่างข้างต้น เราสามารถคิดได้ว่าฟังก์ชันจะใช้อาร์กิวเมนต์ ''C'' ในการเก็บค่าผลรวมปัจจุบัน (สังเกตว่าจะมีการปรับค่าเมื่อเรียก add แต่เมื่อเรียก output จะไม่มีการเปลี่ยนแปลงใด ๆ ในการสร้างโปรเซสที่ทำงานด้วยฟังก์ชัน ''adder/1'' เราจะต้องใส่ค่าให้ C ด้วย ซึ่งโดยปกติแล้วเรามักเริ่มที่ 0 ดังนั้นเพื่อความสะดวกเราจะสร้างฟังก์ชัน ''adder/0'' ที่เรียกให้เราอัตโนมัติ adder() -> adder(0). เมื่อเพิ่มฟังก์ชันทั้งสองลงใน ''pex.erl'' แล้ว เราจะสามารถทดลองได้ดังด้านล่าง >>> c(pex). {ok,pex} >>> P10 = spawn(pex,adder,[]). <0.105.0> >>> P10 ! output. 0 output >>> P10 ! {add,10}. {add,10} >>> P10 ! {add,1000}. {add,1000} >>> P10 ! {add,50}. {add,50} >>> P10 ! output. 1060 output ===== เมื่อเกิดปัญหา?? ===== สิ่งที่ยากที่สุดเรื่องหนึ่งในการเขียนโปรแกรมแบบ Concurrent คือการจัดการกับปัญหา (หรือการ debug) ในการเขียนโปรแกรมด้วย Erlang ก็เช่นเดียวกัน แม้ว่าอีกสักพักเราจะพัฒนารายการของฟังก์ชันสำหรับรับส่งที่ช่วยลดปัญหาไปได้หลายเรื่อง เราก็ยังอาจพบปัญหาอื่น ๆ อยู่ดี ในส่วนนี้จะรวมรายการสำหรับตรวจสอบโปรแกรมเพื่อหาข้อผิดพลาดต่าง ๆ * ตัวแปรต้องขึ้นต้นด้วยตัวใหญ่ ถ้าไม่เช่นนั้น Erlang จะนับเป็น atom เช่น ''hello'', ''welcome'' เป็นต้น * ใน Erlang ไม่มีการให้ค่า (assignment) มีแต่การทำ pattern matching ดังนั้นการสั่ง ''X = 10.'' ไม่ใช่การกำหนดค่า ''10'' ให้กับตัวแปร ''X'' แต่เป็น //พยายาม// ผูกค่า 10 เข้ากับ ''X'' และจะผูกได้ถ้า ''X'' ไม่ได้มีค่ามาก่อน หรือมีค่าเป็น 10 อยู่แล้วเท่านั้น ===== การบ้าน 1. Stack ===== เขียนโมดูล ''stk'' ที่มีฟังก์ชัน ''stack'' ที่เมื่อสั่งให้ spawn process ที่ทำงานด้วยฟังก์ชันดังกล่าวแล้ว โปรเซสจะรองรับ message เหล่านี้ * ''{push,Val}'' ใส่ ''Val'' ลงบนตอนบนของ stack * ''pop'' แสดงค่าบนสุดของ stack และลบค่านั้นออกจาก stack เมื่อเริ่มต้นให้ stack มีสถานะว่าง ตัวอย่างการทำงานเช่น 1> c(stk). {ok,stk} 2> P = spawn(stk,stack,[]). <0.38.0> 3> P ! {push,10}. {push,10} 4> P ! {push,20}. {push,20} 5> P ! pop. 20 pop 6> P ! pop. 10 pop 7> P ! {push,100}. {push,100} 8> P ! pop. 100 pop สังเกตว่าถ้าเราสั่ง ''pop'' มากเกินจะได้ error เช่นดังด้านล่าง 9> P ! pop. =ERROR REPORT==== 24-Sep-2009::14:08:38 === Error in process <0.38.0> with exit value: {{badmatch,[]},[{stk,stack,1}]} ====== การรับส่งระหว่างโปรเซส ====== ปกติแล้วโปรเซสจะไม่ตอบอะไรออกมาโดยการพิมพ์ (เช่นที่เราทำในกรณีของ adder) แต่จะส่งข้อมูลกลับมายังโปรเซสที่ส่งไปถาม อย่างไรก็ตามโปรเซสเมื่อรับ message มาจะไม่สามารถทราบว่าใครส่งมาให้โดยอัตโนมัติ ดังนั้นผู้ส่ง message จะต้องระบุ process ของตนเองไปด้วย ซึ่งทำได้โดยการเรียกฟังก์ชัน ''self()'' ฟังก์ชัน ''adder'' ในโมดูล ''pex'' ของเราจะเปลี่ยนไปดังนี้ adder() -> adder(0). adder(C) -> receive {Pid,add,Val} -> Pid ! {self(),ok}, adder(C+Val); {Pid,output} -> Pid ! {self(),C}, adder(C) end. สังเกตว่า ''adder'' จะรับ message ที่ขึ้นต้นด้วย process id สำหรับส่งกลับ ดังนั้นการจะส่ง message ให้ adder เราจะต้องใส่หมายเลขโปรเซสเข้าไปด้วย เช่น 11> c(pex). {ok,pex} 12> P1 = spawn(pex,adder,[]). <0.53.0> 13> P1 ! {self(),add,100}. {<0.31.0>,add,100} ผลลัพธ์ที่ได้จะส่งกลับมาที่โปรเซส เราสามารถอ่านได้โดยสั่ง receive Any -> Any end. อย่างไรก็ตาม ถ้าเราสั่ง P1 ! {self(),output}. แล้วสั่ง receive Any -> Any end. โปรเซสเราจะค้าง! **คำถามย่อย**: โปรแกรม adder เราทำงานได้ถูกต้องแล้ว แต่ว่าการอ่านค่าของเราผิด เราสั่ง receive ผิดอย่างไร? วิธีการเอาตัวเองออกจากการค้างให้กด ''^G'' เราจะเข้าสู่ mode พิเศษ โดยดู help ได้โดยกด ''h'' ให้กด ''j'' เพื่อ list job (น่าจะพบ job เดียว); จากนั้นสั่ง ''k 1'' เพื่อ kill job นั้น; กด ''s'' เพื่อสร้าง shell; กด ''j'' จะเห็น job 2 เป็น shell ใหม่ที่เราสร้า; และสุดท้ายกด ''c 2'' เพื่อเชื่อมกับ shell ใหม่นั้นกลับไปทำงานอื่น ๆ ต่อ ===== การ receive แบบมี timeout ===== เราสามารถเพิ่มส่วน ''after'' เพื่อระบุค่าที่จะคืนเมื่อรอรับนานเกินกว่าเวลาที่กำหนด (เป็นมิลลิวินาที) ตัวอย่างการใช้เช่น receive hello -> goodbye after 1000 -> byebye end. โปรแกรมข้างต้นจะรอรับ message ''hello'' เป็นเวลา 1 วินาที ถ้ามีจะคืนค่า ''goodbye'' ถ้าไม่เช่นนั้นจะคืนค่า ''byebye'' ดังนั้นเพื่อป้องกันการค้าง (block) เราสามารถสั่ง receive Any -> Any after 0 -> error end. ได้ เรามาทดลองต่อกับ adder 2> P1 = spawn(pex,adder,[]). # สร้าง process ใหม่ <0.63.0> 3> receive Any -> Any after 0 -> error end. # ทดลองรับดู error # ไม่มีของ 6> P1 ! {self(),add,100}. {<0.60.0>,add,100} 7> receive Any -> Any after 0 -> error end. # รับ {<0.63.0>,ok} # ได้ ok 8> P1 ! {self(),add,200}. # ลอง add ใหม่ {<0.60.0>,add,200} 9> receive Any -> Any after 0 -> error end. # รับ {<0.63.0>,ok} # ได้ ok 10> P1 ! {self(),output}. # เรียก output {<0.60.0>,output} 11> receive Any -> Any after 0 -> error end. # รับ error # error 13> P1 ! {self(),add,200}. # ส่ง add {<0.60.0>,add,200} 14> receive Any -> Any after 0 -> error end. # รับ {<0.63.0>,ok} # กลับมารับได้! แต่ถ้าเราลองส่ง ''output'' ไปอีก ก็จะได้ ''error'' เช่นเดิมไปเรื่อย ๆ Hint: 15> Any. {<0.63.0>,ok} ===== ฟังก์ชันสำหรับเรียกใช้งาน ===== การจะส่ง message โดยต้องเรียกและรอรับเองนั้นค่อนข้างยุ่งยาก เรามาเขียนฟังก์ชันเพิ่มเติมให้การเรียก adder ทำได้ง่ายกัน ฟังก์ชัน call ของเราจะเป็นดังนี้ call(Pid,Msg) -> Pid ! {self(),Msg}, receive {Pid,Res} -> Res after 1000 -> timeout end. สังเกตว่าเมื่อเรียกเสร็จจะมีการ receive โดยรอรับ message ที่ขึ้นต้นด้วยหมายเลข process ที่เราส่งไปเท่านั้น เราใส่ ''after'' ไปด้วย เพื่อจัดการกรณีไม่มี message ส่งกลับมา ฟังก์ชันดังกล่าวก่อนจะใช้งานกับ ''adder'' ได้ต้องปรับฟังก์ชัน adder ให้รูปแบบของ message อยู่ในรูปแบบเดียวกันก่อน (สังเกตว่าตอนนี้ add รับแบบหนึ่งส่วน output รับอีกแบบหนึ่ง) ฟังก์ชันที่แก้แล้วพร้อมด้วยฟังก์ชัน adder() -> adder(0). adder(C) -> receive {Pid,{add,Val}} -> Pid ! {self(),ok}, adder(C+Val); {Pid,output} -> Pid ! {self(),C}, adder(C) end. ทดลองใช้ได้ดังนี้ 18> P2 = spawn(pex,adder,[]). <0.88.0> 23> pex:call(P2,{add,100}). ok 24> pex:call(P2,{add,100}). ok 25> pex:call(P2,output). 200 26> pex:call(P2,{add,1000}). ok 27> pex:call(P2,output). 1200 28> ===== การบ้าน 2. Stack ที่ติดต่อโดยการส่ง message ===== สังเกตว่าฟังก์ชัน call ด้านบนทำงานไม่ได้ทำงานได้กับ ''adder'' เพียงอย่างเดียว ให้แก้โมดูล ''stk'' เดิม โดยแก้ฟังก์ชัน ''stack'' ให้ทำงานได้กับฟังก์ชัน ''call'' ด้านบน (ไม่ต้องเขียนฟังก์ชัน ''pex:call'' ใหม่) ตัวอย่างเช่น 28> c(stk). {ok,stk} 29> Ps = spawn(stk,stack,[]). <0.110.0> 30> pex:call(Ps,{push,10}). ok 31> pex:call(Ps,{push,20}). ok 32> pex:call(Ps,{push,30}). ok 33> pex:call(Ps,pop). 30 34> pex:call(Ps,pop). 20 35> pex:call(Ps,{push,100}). ok 36> pex:call(Ps,pop). 100 37> pex:call(Ps,pop). 10 และสังเกตว่าถ้าโปรเซสตายไปก่อน ฟังก์ชัน ''pex:call'' เราจะคืนค่าเป็น ''timeout'' 38> pex:call(Ps,pop). =ERROR REPORT==== 24-Sep-2009::15:32:10 === Error in process <0.110.0> with exit value: {{badmatch,[]},[{stk,stack,1}]} timeout ====== กลุ่มของโปรเซสเพื่อประมวลผลงาน ====== ในส่วนนี้เราจะทดลองสร้างกลุ่มของโปรเซสเพื่อประมวลผลงานและวัดประสิทธิภาพที่เพิ่มขึ้นในกรณีที่เครื่องมี cpu แบบหลาย core ===== การบ้าน 3. ฟังก์ชัน is_prime ===== ให้เขียนฟังก์ชัน ''is_prime'' ในโมดูล ''prime'' ฟังก์ชันดังกล่าวรับจำนวนเต็มแล้วคือ ''true'' หรือ ''false'' ขึ้นกับว่าจำนวนเต็มดังกล่าวเป็นจำนวนเฉพาะหรือไม่ ตัวอย่างการใช้งาน 7> prime:is_prime(11). true 8> prime:is_prime(10). false 9> prime:is_prime(2). true 10> prime:is_prime(1020391). false 11> prime:is_prime(127). true ฟังก์ชันด้านล่าง เรียกใช้ ''is_prime'' กับลิสต์ของจำนวนเต็มตั้งแต่ 1 ถึง 1000000 แล้วแสดงเวลาการทำงาน run_seq() -> statistics(runtime), lists:map(fun prime:is_prime/1,lists:seq(1,1000000,1)), {_,Time} = statistics(runtime), io:format("~p\n",[Time]). ฟังก์ชัน ''statistics'' จะเก็บสถิติต่าง ๆ ของการทำงาน, ฟังก์ชัน ''lists:map'' เรียกฟังก์ชันกับสมาชิกทุกตัวของลิสต์ ===== การบ้าน 4. โปรเซสสำหรับตรวจสอบจำนวนเฉพาะ ===== ให้เขียนฟังก์ชัน ''check'' ในโมดูล prime สำหรับสร้างโปรเซสที่รับลิสต์ของจำนวนเต็มแล้วส่งลิสต์ของ boolean ที่ระบุว่าจำนวนเต็มใดเป็นจำนวนเฉพาะกลับไป โปรเซสที่สร้างจากฟังก์ชัน ''check'' ดังกล่าวต้องทำงานได้กับฟังก์ชัน ''pex:call'' ด้านบน เช่น 27> Pp = spawn(prime,check,[]). <0.95.0> 28> pex:call(Pp,[1,2,3,4,5,6,7,8,9,10]). [false,true,true,false,true,false,true,false,false,false] ===== การทำงานแบบหลายโปรเซส ===== ในส่วนนี้เราจะทดลองเขียนโปรแกรมที่ทำงานหลายโปรเซสเพื่อทดลองทำงานบนเครื่องที่ทำงานแบบหลาย core ส่วนเขียนโปรแกรมในข้อนี้เป็นโบนัส (คิดเป็นข้อที่ 5) ไม่จำเป็นต้องทำ แต่ควรทดลองเป็นอย่างยิ่ง ถ้าไม่ต้องการเขียนในส่วนใดสามารถเข้าไปดูเฉลยได้ทันที ด้านล่างคือฟังก์ชัน ''spawn_n'' ที่สร้างลิสต์ของโปรเซส N โปรเซส จากฟังก์ชันที่ระบุ spawn_n(0,_,_,_,L) -> L; spawn_n(N,Mod,Fun,Args,L) -> Pid = spawn(Mod,Fun,Args), spawn_n(N-1,Mod,Fun,Args,[Pid|L]). spawn_n(N,Mod,Fun,Args) -> spawn_n(N,Mod,Fun,Args,[]). ให้เพิ่มฟังก์ชันดังกล่าวลงในโมดูล ''pex'' และทดลองเรียกใช้ 34> Pids = pex:spawn_n(20,prime,check,[]). [<0.139.0>,<0.138.0>,<0.137.0>,<0.136.0>,<0.135.0>, <0.134.0>,<0.133.0>,<0.132.0>,<0.131.0>,<0.130.0>,<0.129.0>, <0.128.0>,<0.127.0>,<0.126.0>,<0.125.0>,<0.124.0>,<0.123.0>, <0.122.0>,<0.121.0>,<0.120.0>] เราจะทดลองกระจายงานให้หลายโปรเซสทำ เพื่อจะทดลองว่าบนเครื่องที่มีหลาย core ความเร็วจะเพิ่มขึ้นอย่างไร อย่างไรก็ตามเราต้องเรียก ''erlang'' ให้ทำงานในแบบ SMP (symmetric multiprocessing) ก่อน โดยเรียก erl -smp แทนที่จะเรียก ''erl'' ธรรมดา ให้เขียนฟังก์ชันต่อไปนี้ (สำหรับคนที่อยากจะทดลองอย่างเดียวสามารถเข้าไปดูโปรแกรมของฟังก์ชันเหล่านี้ได้เลย) * ฟังก์ชัน ''split_list'' ที่รับลิสต์และจำนวนเต็ม ''N'' จากนั้นแบ่งลิสต์ออกเป็น ''N'' ลิสต์ย่อย ([[:204435-52:erlang:split_list|คำตอบหนึ่ง]]) * ฟังก์ชัน ''send_all'' ที่รับรายการของ process id และรายการของ message จากนั้นส่ง message แต่ละอันให้กับ process ที่ตรงกัน (เช่น ถ้าสั่ง ''send_all([P1,P2],[M1,M2])'' message M1 จะถูกส่งให้ P1 และ M2 ถูกส่งให้ P2) ([[:204435-52:erlang:send_all|คำตอบหนึ่ง]]) * ฟังก์ชัน ''receive_all'' ที่รับลิสต์ของ process id จากนั้นไล่ receive ผลลัพธ์จากแต่ละ process แล้วคืนผลลัพธ์เป็นลิสต์ของสิ่งที่รับได้ ([[:204435-52:erlang:receive_all|คำตอบหนึ่ง]]) **หมายเหตุ** ฟังก์ชันมาตรฐานเกี่ยวกับลิสต์สามารถดูได้ที่[[http://erlang.org/doc/man/lists.html|หน้าเอกสารอ้างอิงของ erlang]] จากนั้นเขียนฟังก์ชัน ''check_list_par/2'' ที่รับรายการของจำนวนเต็ม L และจำนวนเต็ม N จากนั้นให้แบ่ง L ออกเป็น N ส่วน, spawn process N process, แล้วส่งรายการย่อยแต่ละอันใหักับโปรเซสต่าง ๆ จนครบ แล้วรอรับผลลัพธ์จากทุกโปรเซส ([[:204435-52:erlang:check_list_par|คำตอบหนึ่ง]]) ฟังก์ชันด้านล่างเรียกใช้ ''check_list_par'' ให้ทดสอบรายการของตัวเลขจาก 1 ถึง 10000000 พร้อมทั้งแสดงเวลา (เป็นมิลลิวินาที) ให้ทดลองเรียกใช้โดยเปลี่ยนค่า ''N'' เป็นค่าต่าง ๆ (ควรทดลองกับเครื่องที่มีหลาย core, ควรทดลองเปลี่ยนค่าให้ N มากกว่าจำนวน core ด้วย) **หมายเหตุ**: - ในการทดลอง ถ้าเครื่องเร็วให้ทดลองเพิ่มขอบเขตตัวเลข (ให้มากกว่า 10000000) - บางทีเมื่อเพิ่ม process แล้วแต่ runtime จะยังเท่าเดิม (เพราะคิดรวมทุก process) แต่ wall clock จะลดลง ถ้าเรามีหลาย core - ในการเรียกใช้โปรแกรมที่ให้มาสำหรับทดลอง อย่าลืมใส่ฟังก์ชันลงในโมดูลให้ถูกต้องด้วย กล่าวคือฟังก์ชัน ''split_list'' อยู่ในโมดูล ''pex'', ส่วน ''send_all'', ''receive_all'', ''check_list_par'' อยู่ในโมดูล ''prime'' และอย่าลืม ''c(pex).'' และ ''c(prime).'' ก่อนทดลอง run_par(N) -> statistics(runtime), statistics(wall_clock), check_list_par(lists:seq(1,1000000,1),N), {_,Time1} = statistics(runtime), {_,Time2} = statistics(wall_clock), io:format("runtime: ~p, wall clock: ~p\n",[Time1,Time2]).