在介紹Thread之前,我們必須先把Program和Process這兩個觀念作一個釐清。
由上面的描述中,我們在歸納Thread的重點如下
Java以java.lang.Thread這個類別來表示Thread。Class Thread有兩個Constructor:
第一個Constrctor沒有參數,第二個需要一個Runnable物件當參數。Runnable是一個interface,定義於java.lang內,其宣告為
public interface Runnable { public void run(); }
使用Thread()產生的Thread,其進入點為Thread裡的run();使用Thread(Runnable)產生的Thread,其進入點為Runnable物件裡的run()。當run()結束時,這個Thread也就結束了;這和main()結束有相同的效果。其用法以下面範例說明:
public class ThreadExample1 extends Thread { public void run() { // override Thread's run() System.out.println("Here is the starting point of Thread."); for (;;) { // infinite loop to print message System.out.println("User Created Thread"); } } public static void main(String[] argv) { Thread t = new ThreadExample1(); // 產生Thread物件 t.start(); // 開始執行t.run() for (;;) { System.out.println("Main Thread"); } } }
以上程式執行後,螢幕上會持續印出"User Created Thread"或"Main Thread"的字樣。利用Runnable的寫法如下
public class ThreadExample2 implements Runnable { public void run() { // implements Runnable run() System.out.println("Here is the starting point of Thread."); for (;;) { // infinite loop to print message System.out.println("User Created Thread"); } } public static void main(String[] argv) { Thread t = new Thread(new ThreadExample2()); // 產生Thread物件 t.start(); // 開始執行Runnable.run(); for (;;) { System.out.println("Main Thread"); } } }
Thread.setPriority(int)可以設定Thread的優先權,數字越大優先權越高。Thread定義了3個相關的static final variable
public static final int MAX_PRIORITY 10 public static final int MIN_PRIORITY 1 public static final int NORM_PRIORITY 5
要提醒讀者的是,優先權高的Thread其佔有CPU的機會比較高,但優先權低的也都會有機會執行到。其他有關Thread執行的方法有:
你可以執行下面的程式,看看yield()的效果
public class ThreadExample1 extends Thread { public void run() { // overwrite Thread's run() System.out.println("Here is the starting point of Thread."); for (;;) { // infinite loop to print message System.out.println("User Created Thread"); yield(); } } public static void main(String[] argv) { Thread t = new ThreadExample1(); // 產生Thread物件 t.start(); // 開始執行t.run() for (;;) { System.out.println("Main Thread"); yield(); } } }
觀看join的效果
public class JoinExample extends Thread { String myId; public JoinExample(String id) { myId = id; } public void run() { // overwrite Thread's run() for (int i=0; i < 500; i++) { System.out.println(myId+" Thread"); } } public static void main(String[] argv) { Thread t1 = new JoinExample("T1"); // 產生Thread物件 Thread t2 = new JoinExample("T2"); // 產生Thread物件 t1.start(); // 開始執行t1.run() t2.start(); try { t1.join(); // 等待t1結束 t2.join(); // 等待t2結束 } catch (InterruptedException e) {} for (int i=0;i < 5; i++) { System.out.println("Main Thread"); } } }
觀看sleep的效果
public class SleepExample extends Thread { String myId; public SleepExample(String id) { myId = id; } public void run() { // overwrite Thread's run() for (int i=0; i < 500; i++) { System.out.println(myId+" Thread"); try { sleep(100); } catch (InterruptedException e) {} } } public static void main(String[] argv) { Thread t1 = new SleepExample("T1"); // 產生Thread物件 Thread t2 = new SleepExample("T2"); // 產生Thread物件 t1.start(); // 開始執行t1.run() t2.start(); } }
如果設計者沒有提供保護機制的話,Thread取得和失去CPU控制權的時機是由作業系統來決定。也就是說Thread可能在執行任何一個機器指令時,被作業系統取走CPU控制權,並交給另一個Thread。由於某些真實世界的動作是不可分割的,例如跨行轉帳X圓由A帳戶到B帳戶,轉帳前後這兩個帳戶的總金額必須相同,但以程式來實作時,卻無法用一個指令就完成,如轉帳可能要寫成下面的這一段程式碼
if (A >= X) { A = A - X; // 翻譯成3個機器指令LOAD A, SUB X, STORE A B = B +X; }
如果兩個Thread同時要存取A,B兩帳戶進行轉帳,假設當Thread one執行到SUBX後被中斷,Threadtwo接手執行完成另一個轉帳要求,然後Threadone繼續執行未完成的動作,請問這兩個轉帳動作正確嗎?我們以A=1000,B=0,分別轉帳100,200圓來說明此結果
LOAD A // Thread 1, 現在A還是1000 SUB 100 // Thread 1 LOAD A // 假設此時Thread 1被中斷,Thread 2接手, 因為Thread 1 還沒有執行STORE A, 所以變數A還是1000 SUB 200 // Thread 2 STORE A // Thread 2, A = 800 LOAD B // Thread 2, B現在是0 ADD 200 // Thread 2 STORE B // B=200 STORE A // Thread 1拿回控制權, A = 900 LOAD B // Thread 1, B = 200 ADD 100 // Thread 1 STORE B // B = 300
你會發現執行完成後A=900,B=300,也就是說銀行平白損失了200圓。當然另外的執行順序可能造成其他不正確的結果。我們把這問題再整理一下:
因此在撰寫多執行緒的程式時,必須特別考慮這種狀況(又稱為race condition)。Java的解決辦法是,JVM會在每個物件上擺一把鎖(lock),然後程式設計者可以宣告執行某一段程式(通常是用來存取共同資料結構的程式碼, 又稱為Critical Section)時,必須拿到某物件的鎖才行,這個鎖同時間最多只有一個執行緒可以擁有它。
public class Transfer extends Thread { public static Object lock = new Object(); public static int A = 1000; public static int B = 0; private int amount; public Transfer(int x) { amount = x; } public void run() { synchronized(lock) { // 取得lock,如果別的thread A已取得,則目前這個thread會等到thread A釋放該lock if (A >= amount) { A = A - amount; B = B + amount; } } // 離開synchronized區塊後,此thread會自動釋放lock } public static void main(String[] argv) { Thread t1 = new Transfer(100); Thread t2 = new Transfer(200); t1.start(); t2.start(); } }
除了synchronized(ref)的語法可以鎖定ref指到的物件外,synchronized也可以用在object method前面,表示要鎖定this物件才能執行該方法。以下是Queue結構的範例
public class Queue { private Object[] data; private int size; private int head; private int tail; public Queue(int maxLen) { data = new Object[maxLen]; } public synchronized Object deQueue() { Object tmp = data[head]; data[head] = null; head = (head+1)%data.length; size--; return tmp; } public synchronized void enQueue(Object c) { data[tail++] = c; tail %= data.length; size++; } }
雖然上面的程式正確無誤,但並未考慮資源不足時該如何處理。例如Queue已經沒有資料了,卻還想拿出來;或是Queue裡已經塞滿了資料,使用者卻還要放進去?我們當然可以使用Exception Handling的機制:
public class Queue { private Object[] data; private int size; private int head; private int tail; public Queue(int maxLen) { data = new Object[maxLen]; } public synchronized Object deQueue() throws Exception { if (size == 0) { throw new Exception(); } Object tmp = data[head]; data[head] = null; head = (head+1)%data.length; size--; return tmp; } public synchronized void enQueue(Object c) throws Exception { if (size >= maxLen) { throw new Exception(); } data[tail++] = c; tail %= data.length; size++; } }
但假設我們的執行環境是,某些Thread專門負責讀取使用者的需求,並把工作放到Queue裡面,某些Thread則專門由Queue裡抓取工作需求做進一步處理。這種架構的好處是,可以把慢速或不定速的輸入(如透過網路讀資料,連線速度可能差很多),和快速的處理分開,可使系統的反應速度更快,更節省資源。那麼以Exceptoin來處理Queue空掉或爆掉的情況並不合適,因為使用Queue的人必須處理例外狀況,並不斷的消耗CPU資源:
public class Getter extends Thread { Queue q; public Getter(Queue q) { this.q = q; } public void run() { for (;;) { try { Object data = q.deQueue(); // processing } catch(Exception e) { // if we try to sleep here, user may feel slow response // if we do not sleep, CPU will be wasted } } } } public class Putter extends Thread { Queue q; public Putter(Queue q) { this.q = q; } public void run() { for (;;) { try { Object data = null; // get user request q.enQueue(data); } catch(Exception e) { // if we try to sleep here, user may feel slow response // if we do not sleep, CPU will be wasted } } } } public class Main { public static void main(String[] argv) { Queue q = new Queue(10); Getter r1 = new Getter(q); Getter r2 = new Getter(q); Putter w1 = new Putter(q); Putter w2 = new Putter(q); r1.start(); r2.start(); w1.start(); w2.start(); } }
為了解決這類資源分配的問題,Java Object提供了下面三個method:
所謂Runnable Mode是指該Thread隨時可由作業系統分配CPU資源。Blocking Mode表示該Thread正在等待某個事件發生,作業系統不會讓這種Thread取得CPU資源。前一個Queue的範例就可以寫成:
public class Queue { private Object[] data; private int size; private int head; private int tail; public Queue(int maxLen) { data = new Object[maxLen]; } public synchronized Object deQueue() { while (size==0) { // When executing here, Thread must have got lock and be in running mode // Let current Thread wait this object(to sleeping mode) try { wait(); // to sleeping mode, and release all lock } catch(Exception ex) {}; } Object tmp = data[head]; data[head] = null; head = (head+1)%data.length; if (size==data.length) { // wake up all Threads waiting this object notifyAll(); } size--; return tmp; } // release lock public synchronized void enQueue(Object c) { while (size==data.length) { // When executing here, Thread must have got lock and be in running mode // Let current thread wait this object(to sleeping mode) try { wait(); // to sleeping mode, and release all lock } catch(Exception ex) {}; } data[tail++] = c; tail %= data.length; size++; if (size==1) { // wake up all Threads waiting this object notifyAll(); } } } public class ReaderWriter extends Thread { public static final int READER = 1; public static final int WRITER = 2; private Queue q; private int mode; public void run() { for (int i=0; i < 1000; i++) { if (mode==READER) { q.deQueue(); } else if (mode==WRITER) { q.enQueue(new Integer(i)); } } } public ReaderWriter(Queue q, int mode) { this.q = q; this.mode = mode; } public static void main(String[] args) { Queue q = new Queue(5); ReaderWriter r1, r2, w1, w2; (w1 = new ReaderWriter(q, WRITER)).start(); (w2 = new ReaderWriter(q, WRITER)).start(); (r1 = new ReaderWriter(q, READER)).start(); (r2 = new ReaderWriter(q, READER)).start(); try { w1.join(); // wait until w1 complete w2.join(); // wait until w2 complete r1.join(); // wait until r1 complete r2.join(); // wait until r2 complete } catch(InterruptedException epp) { } } }
上一節的Queue資料結構,不論是enQueue()或deQueue()都會更動到Queue的內容。而在許多應用裡,資料結構可以允許同時多個讀一個寫。本節舉出幾個不同的例子,說明多個Reader-Writer時的可能排程法。
Single Reader-Writer, 只同時允許一個執行緒存取
public class SingleReaderWriter { int n; // number of reader and write, 0 or 1 public synchronized void startReading() throws InterruptedException { while (n != 0) { wait(); } n = 1; } public synchronized void stopReading() { n = 0; notify(); } public synchronized void startWriting() throws InterruptedException { while (n != 0) { wait(); } n = 1; } public synchronized void stopWriting() { n = 0; notify(); } } // 這是一個使用範例, 程式能否正確執行要靠呼叫正確的start和stop public class WriterThread extends Thread { SingleReaderWriter srw; public WriterThread(SingleReaderWriter srw) { this.srw = srw; } public void run() { startWring(); // insert real job here stopWriting(); } } public class ReaderThread extends Thread { SingleReaderWriter srw; public ReaderThread(SingleReaderWriter srw) { this.srw = srw; } public void run() { startReading(); // insert real job here stopReading(); } } public class Test { public static void main(String[] argv) { SingleReaderWriter srw = new SingleReaderWriter; // create four threads (new WriterThread(srw)).start(); (new WriterThread(srw)).start(); (new ReaderThread(srw)).start(); (new ReaderThread(srw)).start(); } }
其他可能的策略實作如下:
Reader優先:
public class ReadersPreferredMonitor { int nr; // The number of threads currently reading, nr > = 0 int nw; // The number of threads currently writing, 0 or 1 int nrtotal; // The number of threads either reading or waiting to read, nrtotal > = nr int nwtotal; // The number of threads either writing or waiting to write public synchronized void startReading() throws InterruptedException { nrtotal++; // 想要read的thread又多了一個 while (nw != 0) { // 還有write thread正在write wait(); } nr++; // 正在讀的thread多了一個 } public synchronized void startWriting() throws InterruptedException { nwtotal++; // 想要寫的thread又多了一個 while (nrtotal+nw != 0) { // 只要有thread想要讀,或是有thread正在寫,禮讓 wait(); } nw = 1; } public synchronized void stopReading() { nr--; // 正在讀的少一個 nrtotal--; // 想要讀的少一個 if (nrtotal == 0) { // 如果沒有要讀的,叫醒想寫的 notify(); } } public synchronized void stopWriting() { nw = 0; // 沒有thread正在寫 nwtotal--; // 想寫的少一個 notifyAll(); // 叫醒所有想讀和想寫的 } }
Writer優先:
public class WritersPreferredMonitor { int nr; // The number of threads currently reading, nr > = 0 int nw; // The number of threads currently writing, 0 or 1 int nrtotal; // The number of threads either reading or waiting to read, nrtotal > = nr int nwtotal; // The number of threads either writing or waiting to write public synchronized void startReading() throws InterruptedException { nrtotal++; // 想要read的thread又多了一個 while (nwtotal != 0) { // 還有thread想要write wait(); } nr++; // 正在讀的thread多了一個 } public synchronized void startWriting() throws InterruptedException { nwtotal++; // 想要寫的thread又多了一個 while (nr+nw != 0) { // 有thread正在讀,或是有thread正在寫 wait(); } nw = 1; } public synchronized void stopReading() { nr--; // 正在讀的少一個 nrtotal--; // 想要讀的少一個 if (nr == 0) { // 如果沒有正在讀的,叫醒所有的(包括想寫的) notifyAll(); } } public synchronized void stopWriting() { nw = 0; // 沒有thread正在寫 nwtotal--; // 想寫的少一個 notifyAll(); // 叫醒所有想讀和想寫的 } }
Reader和Writer交互執行:
public class AlternatingReadersWritersMonitor { int[] nr = new int[2]; // The number of threads currently reading int thisBatch; // Index in nr of the batch of readers currently reading(0 or 1) int nextBatch = 1; // Index in nr of the batch of readers waitin to read(always 1-thisBatch) int nw; // The number of threads currently writing(0 or 1) int nwtotal; // The number of threads either writing or waiting to write public synchronized void startReading() throws InterruptedException { if (nwtotal == 0) { // 沒有thread要write, 將reader都放到目前要處理的這一批 nr[thisBatch]++; } else { nr[nextBatch]++; int myBatch = nextBatch; while (thisBatch != myBatch) { wait(); } } } public synchronized void stopReading() { nr[thisBatch]--; if (nr[thisBatch] == 0) { // 目前這批的reader都讀完了,找下一個writer notifyAll(); } } public synchronized void startWriting() throws InterruptedException { nwtotal++; while (nr[thisBatch]+nw != 0) { // 目前這批還沒完,或有thread正在寫 wait(); } nw = 1; } public synchronized void stopWriting() { nw = 0; nwtotal--; int tmp = thisBatch; // 交換下一批要讀的 thisBatch = nextBatch; nextBatch = tmp; notifyAll(); } }
給號依序執行
public class TakeANumberMonitor { int nr; // The number of threads currently reading int nextNumber; // The number to be taken by the next thread to arrive int nowServing; // The number of the thread to be served next public synchronized void startReading() throws InterruptedException { int myNumber = nextNumber++; while (nowServing != myNumber) { // 還沒輪到我 wait(); } nr++; // 多了一個Reader nowServing++; // 準備檢查下一個 notifyAll(); } public synchronized void startWriting() throws InterruptedException { int myNumber = nextNumber++; while (nowServing != myNumber) { // 還沒輪到我 wait(); } while (nr > 0) { // 要等所有的Reader結束 wait(); } } public synchronized void stopReading() { nr--; // 少了一個Reader if (nr == 0) { notifyAll(); } } public synchronized void stopWriting() { nowServing++; // 準備檢查下一個 notifyAll(); } }