您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Linux下如何使用管道和消息隊列,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
POSIX 的一個核心目標就是線程安全。
請查看一些 mq_open 函數的 man 頁,這個函數屬于內存隊列的 API。這個 man 頁中有關 特性 的章節帶有一個小表格:
接口 | 特性 | 值 |
---|---|---|
mq_open() | 線程安全 | MT-Safe |
上面的 MT-Safe(MT 指的是多線程)意味著 mq_open
函數是線程安全的,進而暗示是進程安全的:一個進程的執行和它的一個線程執行的過程類似,假如競爭條件不會發生在處于相同進程的線程中,那么這樣的條件也不會發生在處于不同進程的線程中。MT-Safe 特性保證了調用 mq_open
時不會出現競爭條件。一般來說,基于通道的 IPC 是并發安全的,盡管在下面例子中會出現一個有關警告的注意事項。
首先讓我們通過一個特意構造的命令行例子來展示無名管道是如何工作的。在所有的現代系統中,符號 |
在命令行中都代表一個無名管道。假設我們的命令行提示符為 %
,接下來考慮下面的命令:
## 寫入方在 | 左邊,讀取方在右邊% sleep 5 | echo "Hello, world!"
sleep
和 echo
程序以不同的進程執行,無名管道允許它們進行通信。但是上面的例子被特意設計為沒有通信發生。問候語 “Hello, world!” 出現在屏幕中,然后過了 5 秒后,命令行返回,暗示 sleep
和 echo
進程都已經結束了。這期間發生了什么呢?
在命令行中的豎線 |
的語法中,左邊的進程(sleep
)是寫入方,右邊的進程(echo
)為讀取方。默認情況下,讀取方將會阻塞,直到從通道中能夠讀取到字節數據,而寫入方在寫完它的字節數據后,將發送流已終止的標志。(即便寫入方過早終止了,一個流已終止的標志還是會發給讀取方。)無名管道將保持到寫入方和讀取方都停止的那個時刻。
在上面的例子中,sleep
進程并沒有向通道寫入任何的字節數據,但在 5 秒后就終止了,這時將向通道發送一個流已終止的標志。與此同時,echo
進程立即向標準輸出(屏幕)寫入問候語,因為這個進程并不從通道中讀入任何字節,所以它并沒有等待。一旦 sleep
和 echo
進程都終止了,不會再用作通信的無名管道將會消失然后返回命令行提示符。
下面這個更加實用的示例將使用兩個無名管道。我們假定文件 test.dat
的內容如下:
thisisthewaytheworldends
下面的命令:
% cat test.dat | sort | uniq
會將 cat
(連接的縮寫)進程的輸出通過管道傳給 sort
進程以生成排序后的輸出,然后將排序后的輸出通過管道傳給 uniq
進程以消除重復的記錄(在本例中,會將兩次出現的 “the” 縮減為一個):
endsisthethiswayworld
下面展示的情景展示的是一個帶有兩個進程的程序通過一個無名管道通信來進行通信。
#include <sys/wait.h> /* wait */#include <stdio.h>#include <stdlib.h> /* exit functions */#include <unistd.h> /* read, write, pipe, _exit */#include <string.h> #define ReadEnd 0#define WriteEnd 1 void report_and_exit(const char* msg) { [perror][6](msg); [exit][7](-1); /** failure **/} int main() { int pipeFDs[2]; /* two file descriptors */ char buf; /* 1-byte buffer */ const char* msg = "Nature's first green is gold\n"; /* bytes to write */ if (pipe(pipeFDs) < 0) report_and_exit("pipeFD"); pid_t cpid = fork(); /* fork a child process */ if (cpid < 0) report_and_exit("fork"); /* check for failure */ if (0 == cpid) { /*** child ***/ /* child process */ close(pipeFDs[WriteEnd]); /* child reads, doesn't write */ while (read(pipeFDs[ReadEnd], &buf, 1) > 0) /* read until end of byte stream */ write(STDOUT_FILENO, &buf, sizeof(buf)); /* echo to the standard output */ close(pipeFDs[ReadEnd]); /* close the ReadEnd: all done */ _exit(0); /* exit and notify parent at once */ } else { /*** parent ***/ close(pipeFDs[ReadEnd]); /* parent writes, doesn't read */ write(pipeFDs[WriteEnd], msg, [strlen][8](msg)); /* write the bytes to the pipe */ close(pipeFDs[WriteEnd]); /* done writing: generate eof */ wait(NULL); /* wait for child to exit */ [exit][7](0); /* exit normally */ } return 0;}
上面名為 pipeUN
的程序使用系統函數 fork
來創建一個進程。盡管這個程序只有一個單一的源文件,在它正確執行的情況下將會發生多進程的情況。
下面的內容是對庫函數
fork
如何工作的一個簡要回顧:
fork
函數由父進程調用,在失敗時返回-1
給父進程。在pipeUN
這個例子中,相應的調用是:函數調用后的返回值也被保存下來了。在這個例子中,保存在整數類型
pid_t
的變量cpid
中。(每個進程有它自己的進程 ID,這是一個非負的整數,用來標記進程)。復刻一個新的進程可能會因為多種原因而失敗,包括進程表滿了的原因,這個結構由系統維持,以此來追蹤進程狀態。明確地說,僵尸進程假如沒有被處理掉,將可能引起進程表被填滿的錯誤。
pid_t cpid = fork(); /* called in parent */
假如
fork
調用成功,則它將創建一個新的子進程,向父進程返回一個值,向子進程返回另外的一個值。在調用fork
后父進程和子進程都將執行相同的代碼。(子進程繼承了到此為止父進程中聲明的所有變量的拷貝),特別地,一次成功的fork
調用將返回如下的東西:
向子進程返回
0
向父進程返回子進程的進程 ID
在一次成功的
fork
調用后,一個if
/else
或等價的結構將會被用來隔離針對父進程和子進程的代碼。在這個例子中,相應的聲明為:
if (0 == cpid) { /*** child ***/
...
}
else { /*** parent ***/
...
}
假如成功地復刻出了一個子進程,pipeUN
程序將像下面這樣去執行。在一個整數的數列里:
int pipeFDs[2]; /* two file descriptors */
來保存兩個文件描述符,一個用來向管道中寫入,另一個從管道中寫入。(數組元素 pipeFDs[0]
是讀端的文件描述符,元素 pipeFDs[1]
是寫端的文件描述符。)在調用 fork
之前,對系統 pipe
函數的成功調用,將立刻使得這個數組獲得兩個文件描述符:
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
父進程和子進程現在都有了文件描述符的副本。但分離關注點模式意味著每個進程恰好只需要一個描述符。在這個例子中,父進程負責寫入,而子進程負責讀取,盡管這樣的角色分配可以反過來。在 if
子句中的***個語句將用于關閉管道的讀端:
close(pipeFDs[WriteEnd]); /* called in child code */
在父進程中的 else
子句將會關閉管道的讀端:
close(pipeFDs[ReadEnd]); /* called in parent code */
然后父進程將向無名管道中寫入某些字節數據(ASCII 代碼),子進程讀取這些數據,然后向標準輸出中回放它們。
在這個程序中還需要澄清的一點是在父進程代碼中的 wait
函數。一旦被創建后,子進程很大程度上獨立于它的父進程,正如簡短的 pipeUN
程序所展示的那樣。子進程可以執行任意的代碼,而它們可能與父進程完全沒有關系。但是,假如當子進程終止時,系統將會通過一個信號來通知父進程。
要是父進程在子進程之前終止又該如何呢?在這種情形下,除非采取了預防措施,子進程將會變成在進程表中的一個僵尸進程。預防措施有兩大類型:***種是讓父進程去通知系統,告訴系統它對子進程的終止沒有任何興趣:
signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */
第二種方法是在子進程終止時,讓父進程執行一個 wait
。這樣就確保了父進程可以獨立于子進程而存在。在 pipeUN
程序中使用了第二種方法,其中父進程的代碼使用的是下面的調用:
wait(NULL); /* called in parent */
這個對 wait
的調用意味著一直等待直到任意一個子進程的終止發生,因此在 pipeUN
程序中,只有一個子進程。(其中的 NULL
參數可以被替換為一個保存有子程序退出狀態的整數變量的地址。)對于更細粒度的控制,還可以使用更靈活的 waitpid
函數,例如特別指定多個子進程中的某一個。
pipeUN
將會采取另一個預防措施。當父進程結束了等待,父進程將會調用常規的 exit
函數去退出。對應的,子進程將會調用 _exit
變種來退出,這類變種將快速跟蹤終止相關的通知。在效果上,子進程會告訴系統立刻去通知父進程它的這個子進程已經終止了。
假如兩個進程向相同的無名管道中寫入內容,字節數據會交錯嗎?例如,假如進程 P1 向管道寫入內容:
foo bar
同時進程 P2 并發地寫入:
baz baz
到相同的管道,***的結果似乎是管道中的內容將會是任意錯亂的,例如像這樣:
baz foo baz bar
只要沒有寫入超過 PIPE_BUF
字節,POSIX 標準就能確保寫入不會交錯。在 Linux 系統中, PIPE_BUF
的大小是 4096 字節。對于管道我更喜歡只有一個寫入方和一個讀取方,從而繞過這個問題。
無名管道沒有備份文件:系統將維持一個內存緩存來將字節數據從寫方傳給讀方。一旦寫方和讀方終止,這個緩存將會被回收,進而無名管道消失。相反的,命名管道有備份文件和一個不同的 API。
下面讓我們通過另一個命令行示例來了解命名管道的要點。下面是具體的步驟:
開啟兩個終端。這兩個終端的工作目錄應該相同。
在其中一個終端中,鍵入下面的兩個命令(命令行提示符仍然是 %
,我的注釋以 ##
打頭。):
在最開始,沒有任何東西會出現在終端中,因為到現在為止沒有在命名管道中寫入任何東西。
% mkfifo tester ## 創建一個備份文件,名為 tester
% cat tester ## 將管道的內容輸出到 stdout
在第二個終端中輸入下面的命令:
無論在這個終端中輸入什么,它都會在另一個終端中顯示出來。一旦鍵入 Ctrl+C
,就會回到正常的命令行提示符,因為管道已經被關閉了。
% cat > tester ## redirect keyboard input to the pipe
hello, world! ## then hit Return key
bye, bye ## ditto
<Control-C> ## terminate session with a Control-C
通過移除實現命名管道的文件來進行清理:
% unlink tester
正如 mkfifo
程序的名字所暗示的那樣,命名管道也被叫做 FIFO,因為***個進入的字節,就會***個出,其他的類似。有一個名為 mkfifo
的庫函數,用它可以在程序中創建一個命名管道,它將在下一個示例中被用到,該示例由兩個進程組成:一個向命名管道寫入,而另一個從該管道讀取。
#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h> #include <unistd.h>#include <time.h>#include <stdlib.h>#include <stdio.h> #define MaxLoops 12000 /* outer loop */#define ChunkSize 16 /* how many written at a time */#define IntsPerChunk 4 /* four 4-byte ints per chunk */#define MaxZs 250 /* max microseconds to sleep */ int main() { const char* pipeName = "./fifoChannel"; mkfifo(pipeName, 0666); /* read/write for user/group/others */ int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */ if (fd < 0) return -1; /** error **/ int i; for (i = 0; i < MaxLoops; i++) { /* write MaxWrites times */ int j; for (j = 0; j < ChunkSize; j++) { /* each time, write ChunkSize bytes */ int k; int chunk[IntsPerChunk]; for (k = 0; k < IntsPerChunk; k++) chunk[k] = [rand][9](); write(fd, chunk, sizeof(chunk)); } usleep(([rand][9]() % MaxZs) + 1); /* pause a bit for realism */ } close(fd); /* close pipe: generates an end-of-file */ unlink(pipeName); /* unlink from the implementing file */ [printf][10]("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk); return 0;}
上面的 fifoWriter
程序可以被總結為如下:
首先程序創建了一個命名管道用來寫入數據:
其中的 pipeName
是備份文件的名字,傳遞給 mkfifo
作為它的***個參數。接著命名管道通過我們熟悉的 open
函數調用被打開,而這個函數將會返回一個文件描述符。
mkfifo(pipeName, 0666); /* read/write perms for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY);
在實現層面上,fifoWriter
不會一次性將所有的數據都寫入,而是寫入一個塊,然后休息隨機數目的微秒時間,接著再循環往復。總的來說,有 768000 個 4 字節整數值被寫入到命名管道中。
在關閉命名管道后,fifoWriter
也將使用 unlink
取消對該文件的連接。
一旦連接到管道的每個進程都執行了 unlink
操作后,系統將回收這些備份文件。在這個例子中,只有兩個這樣的進程 fifoWriter
和 fifoReader
,它們都做了 unlink
操作。
close(fd); /* close pipe: generates end-of-stream marker */
unlink(pipeName); /* unlink from the implementing file */
這個兩個程序應該在不同終端的相同工作目錄中執行。但是 fifoWriter
應該在 fifoReader
之前被啟動,因為需要 fifoWriter
去創建管道。然后 fifoReader
才能夠獲取到剛被創建的命名管道。
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <fcntl.h>#include <unistd.h> unsigned is_prime(unsigned n) { /* not pretty, but gets the job done efficiently */ if (n <= 3) return n > 1; if (0 == (n % 2) || 0 == (n % 3)) return 0; unsigned i; for (i = 5; (i * i) <= n; i += 6) if (0 == (n % i) || 0 == (n % (i + 2))) return 0; return 1; /* found a prime! */} int main() { const char* file = "./fifoChannel"; int fd = open(file, O_RDONLY); if (fd < 0) return -1; /* no point in continuing */ unsigned count = 0, total = 0, primes_count = 0; while (1) { int next; int i; ssize_t count = read(fd, &next, sizeof(int)); if (0 == count) break; /* end of stream */ else if (count == sizeof(int)) { /* read a 4-byte int value */ total++; if (is_prime(next)) primes_count++; } } close(fd); /* close pipe from read end */ unlink(file); /* unlink from the underlying file */ [printf][10]("Received ints: %u, primes: %u\n", total, primes_count); return 0;}
上面的 fifoReader
的內容可以總結為如下:
因為 fifoWriter
已經創建了命名管道,所以 fifoReader
只需要利用標準的 open
調用來通過備份文件來獲取到管道中的內容:
這個文件的是以只讀打開的。
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);
然后這個程序進入一個潛在的***循環,在每次循環時,嘗試讀取 4 字節的塊。read
調用:
返回 0 來暗示該流的結束。在這種情況下,fifoReader
跳出循環,關閉命名管道,并在終止前 unlink
備份文件。
ssize_t count = read(fd, &next, sizeof(int));
在讀入 4 字節整數后,fifoReader
檢查這個數是否為質數。這個操作代表了一個生產級別的讀取器可能在接收到的字節數據上執行的邏輯操作。在示例運行中,在接收到的 768000 個整數中有 37682 個質數。
重復運行示例, fifoReader
將成功地讀取 fifoWriter
寫入的所有字節。這不是很讓人驚訝的。這兩個進程在相同的機器上執行,從而可以不用考慮網絡相關的問題。命名管道是一個可信且高效的 IPC 機制,因而被廣泛使用。
下面是這兩個程序的輸出,它們在不同的終端中啟動,但處于相同的工作目錄:
% ./fifoWriter768000 ints sent to the pipe.###% ./fifoReaderReceived ints: 768000, primes: 37682
管道有著嚴格的先入先出行為:***個被寫入的字節將會***個被讀,第二個寫入的字節將第二個被讀,以此類推。消息隊列可以做出相同的表現,但它又足夠靈活,可以使得字節塊可以不以先入先出的次序來接收。
正如它的名字所提示的那樣,消息隊列是一系列的消息,每個消息包含兩部分:
荷載,一個字節序列(在 C 中是 char)
類型,以一個正整數值的形式給定,類型用來分類消息,為了更靈活的回收
看一下下面對一個消息隊列的描述,每個消息由一個整數類型標記:
+-+ +-+ +-+ +-+sender--->|3|--->|2|--->|2|--->|1|--->receiver +-+ +-+ +-+ +-+
在上面展示的 4 個消息中,標記為 1 的是開頭,即最接近接收端,然后另個標記為 2 的消息,***接著一個標記為 3 的消息。假如按照嚴格的 FIFO 行為執行,消息將會以 1-2-2-3 這樣的次序被接收。但是消息隊列允許其他收取次序。例如,消息可以被接收方以 3-2-1-2 的次序接收。
mqueue
示例包含兩個程序,sender
將向消息隊列中寫入數據,而 receiver
將從這個隊列中讀取數據。這兩個程序都包含的頭文件 queue.h
如下所示:
#define ProjectId 123#define PathName "queue.h" /* any existing, accessible file would do */#define MsgLen 4#define MsgCount 6 typedef struct { long type; /* must be of type long */ char payload[MsgLen + 1]; /* bytes in the message */ } queuedMessage;
上面的頭文件定義了一個名為 queuedMessage
的結構類型,它帶有 payload
(字節數組)和 type
(整數)這兩個域。該文件也定義了一些符號常數(使用 #define
語句),前兩個常數被用來生成一個 key
,而這個 key
反過來被用來獲取一個消息隊列的 ID。ProjectId
可以是任何正整數值,而 PathName
必須是一個存在的、可訪問的文件,在這個示例中,指的是文件 queue.h
。在 sender
和 receiver
中,它們都有的設定語句為:
key_t key = ftok(PathName, ProjectId); /* generate key */int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */
ID qid
在效果上是消息隊列文件描述符的對應物。
#include <stdio.h> #include <sys/ipc.h> #include <sys/msg.h>#include <stdlib.h>#include <string.h>#include "queue.h" void report_and_exit(const char* msg) { [perror][6](msg); [exit][7](-1); /* EXIT_FAILURE */} int main() { key_t key = ftok(PathName, ProjectId); if (key < 0) report_and_exit("couldn't get key..."); int qid = msgget(key, 0666 | IPC_CREAT); if (qid < 0) report_and_exit("couldn't get queue id..."); char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"}; int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */ int i; for (i = 0; i < MsgCount; i++) { /* build the message */ queuedMessage msg; msg.type = types[i]; [strcpy][11](msg.payload, payloads[i]); /* send the message */ msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */ [printf][10]("%s sent as type %i\n", msg.payload, (int) msg.type); } return 0;}
上面的 sender
程序將發送出 6 個消息,每兩個為一個類型:前兩個是類型 1,接著的連個是類型 2,***的兩個為類型 3。發送的語句:
msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);
被配置為非阻塞的(IPC_NOWAIT
標志),是因為這里的消息體量上都很小。唯一的危險在于一個完整的序列將可能導致發送失敗,而這個例子不會。下面的 receiver
程序也將使用 IPC_NOWAIT
標志來接收消息。
#include <stdio.h> #include <sys/ipc.h> #include <sys/msg.h>#include <stdlib.h>#include "queue.h" void report_and_exit(const char* msg) { [perror][6](msg); [exit][7](-1); /* EXIT_FAILURE */} int main() { key_t key= ftok(PathName, ProjectId); /* key to identify the queue */ if (key < 0) report_and_exit("key not gotten..."); int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */ if (qid < 0) report_and_exit("no access to queue..."); int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */ int i; for (i = 0; i < MsgCount; i++) { queuedMessage msg; /* defined in queue.h */ if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0) [puts][12]("msgrcv trouble..."); [printf][10]("%s received as type %i\n", msg.payload, (int) msg.type); } /** remove the queue **/ if (msgctl(qid, IPC_RMID, NULL) < 0) /* NULL = 'no flags' */ report_and_exit("trouble removing queue..."); return 0; }
這個 receiver
程序不會創建消息隊列,盡管 API 盡管建議那樣。在 receiver
中,對
int qid = msgget(key, 0666 | IPC_CREAT);
的調用可能因為帶有 IPC_CREAT
標志而具有誤導性,但是這個標志的真實意義是如果需要就創建,否則直接獲取。sender
程序調用 msgsnd
來發送消息,而 receiver
調用 msgrcv
來接收它們。在這個例子中,sender
以 1-1-2-2-3-3 的次序發送消息,但 receiver
接收它們的次序為 3-1-2-1-3-2,這顯示消息隊列沒有被嚴格的 FIFO 行為所拘泥:
% ./sendermsg1 sent as type 1msg2 sent as type 1msg3 sent as type 2msg4 sent as type 2msg5 sent as type 3msg6 sent as type 3 % ./receivermsg5 received as type 3msg1 received as type 1msg3 received as type 2msg2 received as type 1msg6 received as type 3msg4 received as type 2
上面的輸出顯示 sender
和 receiver
可以在同一個終端中啟動。輸出也顯示消息隊列是持久的,即便 sender
進程在完成創建隊列、向隊列寫數據、然后退出的整個過程后,該隊列仍然存在。只有在 receiver
進程顯式地調用 msgctl
來移除該隊列,這個隊列才會消失:
if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */
關于“Linux下如何使用管道和消息隊列”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。