91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么用Python編寫簡單的聊天程序

發布時間:2023-05-09 10:57:44 來源:億速云 閱讀:96 作者:zzz 欄目:編程語言

這篇文章主要講解了“怎么用Python編寫簡單的聊天程序”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么用Python編寫簡單的聊天程序”吧!

實現思路

x01 服務端的建立

首先,在服務端,使用socket進行消息的接受,每接受一個socket的請求,就開啟一個新的線程來管理消息的分發與接受,同時,又存在一個handler來管理所有的線程,從而實現對聊天室的各種功能的處理

x02 客戶端的建立

客戶端的建立就要比服務端簡單多了,客戶端的作用只是對消息的發送以及接受,以及按照特定的規則去輸入特定的字符從而實現不同的功能的使用,因此,在客戶端這里,只需要去使用兩個線程,一個是專門用于接受消息,一個是專門用于發送消息的

至于為什么不用一個呢,那是因為,只用一個的話,當接受了消息,在發送之前接受消息的處于阻塞狀態,同理,發送消息也是,那么要是將這兩個功能放在一個地方實現,就會導致沒有辦法連續發送或者接受消息了

實現方式

服務端實現

怎么用Python編寫簡單的聊天程序

怎么用Python編寫簡單的聊天程序

import json
import threading
from socket import *
from time import ctime


class PyChattingServer:
    __socket = socket(AF_INET, SOCK_STREAM, 0)
    __address = ('', 12231)

    __buf = 1024

    def __init__(self):
        self.__socket.bind(self.__address)
        self.__socket.listen(20)
        self.__msg_handler = ChattingHandler()

    def start_session(self):
        print('等待客戶連接...\r\n')
        try:
            while True:
                cs, caddr = self.__socket.accept()
                # 利用handler來管理線程,實現線程之間的socket的相互通信
                self.__msg_handler.start_thread(cs, caddr)
        except socket.error:
            pass


class ChattingThread(threading.Thread):
    __buf = 1024

    def __init__(self, cs, caddr, msg_handler):
        super(ChattingThread, self).__init__()
        self.__cs = cs
        self.__caddr = caddr
        self.__msg_handler = msg_handler

    # 使用多線程管理會話
    def run(self):
        try:
            print('...連接來自于:', self.__caddr)
            data = '歡迎你到來PY_CHATTING!請輸入你的很cooooool的昵稱(不能帶有空格喲`)\r\n'
            self.__cs.sendall(bytes(data, 'utf-8'))
            while True:
                data = self.__cs.recv(self.__buf).decode('utf-8')
                if not data:
                    break
                self.__msg_handler.handle_msg(data, self.__cs)
                print(data)
        except socket.error as e:
            print(e.args)
            pass
        finally:
            self.__msg_handler.close_conn(self.__cs)
            self.__cs.close()


class ChattingHandler:
    __help_str = "[ SYSTEM ]\r\n" \
                 "輸入/ls,即可獲得所有登陸用戶信息\r\n" \
                 "輸入/h,即可獲得幫助\r\n" \
                 "輸入@用戶名 (注意用戶名后面的空格)+消息,即可發動單聊\r\n" \
                 "輸入/i,即可屏蔽群聊信息\r\n" \
                 "再次輸入/i,即可取消屏蔽\r\n" \
                 "所有首字符為/的信息都不會發送出去"

    __buf = 1024
    __socket_list = []

    __user_name_to_socket = {}
    __socket_to_user_name = {}

    __user_name_to_broadcast_state = {}

    def start_thread(self, cs, caddr):
        self.__socket_list.append(cs)
        chat_thread = ChattingThread(cs, caddr, self)
        chat_thread.start()

    def close_conn(self, cs):
        if cs not in self.__socket_list:
            return
        # 去除socket的記錄
        nickname = "SOMEONE"
        if cs in self.__socket_list:
            self.__socket_list.remove(cs)
        # 去除socket與username之間的映射關系
        if cs in self.__socket_to_user_name:
            nickname = self.__socket_to_user_name[cs]
            self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])
            self.__socket_to_user_name.pop(cs)
            self.__user_name_to_broadcast_state.pop(nickname)
        nickname += " "
        # 廣播某玩家退出聊天室
        self.broadcast_system_msg(nickname + "離開了PY_CHATTING")

    # 管理用戶輸入的信息
    def handle_msg(self, msg, cs):
        js = json.loads(msg)
        if js['type'] == "login":
            if js['msg'] not in self.__user_name_to_socket:
                if ' ' in js['msg']:
                    self.send_to(json.dumps({
                        'type': 'login',
                        'success': False,
                        'msg': '賬號不能夠帶有空格'
                    }), cs)
                else:
                    self.__user_name_to_socket[js['msg']] = cs
                    self.__socket_to_user_name[cs] = js['msg']
                    self.__user_name_to_broadcast_state[js['msg']] = True
                    self.send_to(json.dumps({
                        'type': 'login',
                        'success': True,
                        'msg': '昵稱建立成功,輸入/ls可查看所有在線的人,輸入/help可以查看幫助(所有首字符為/的消息都不會發送)'
                    }), cs)
                    # 廣播其他人,他已經進入聊天室
                    self.broadcast_system_msg(js['msg'] + "已經進入了聊天室")
            else:
                self.send_to(json.dumps({
                    'type': 'login',
                    'success': False,
                    'msg': '賬號已存在'
                }), cs)
        # 若玩家處于屏蔽模式,則無法發送群聊消息
        elif js['type'] == "broadcast":
            if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
                self.broadcast(js['msg'], cs)
            else:
                self.send_to(json.dumps({
                    'type': 'broadcast',
                    'msg': '屏蔽模式下無法發送群聊信息'
                }), cs)
        elif js['type'] == "ls":
            self.send_to(json.dumps({
                'type': 'ls',
                'msg': self.get_all_login_user_info()
            }), cs)
        elif js['type'] == "help":
            self.send_to(json.dumps({
                'type': 'help',
                'msg': self.__help_str
            }), cs)
        elif js['type'] == "sendto":
            self.single_chatting(cs, js['nickname'], js['msg'])
        elif js['type'] == "ignore":
            self.exchange_ignore_state(cs)

    def exchange_ignore_state(self, cs):
        if cs in self.__socket_to_user_name:
            state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]
            if state:
                state = False
            else:
                state = True
            self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs])
            self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state
            if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
                msg = "通常模式"
            else:
                msg = "屏蔽模式"
            self.send_to(json.dumps({
                'type': 'ignore',
                'success': True,
                'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切換成功,現在是" + msg)
            }), cs)
        else:
            self.send_to({
                'type': 'ignore',
                'success': False,
                'msg': '切換失敗'
            }, cs)

    def single_chatting(self, cs, nickname, msg):
        if nickname in self.__user_name_to_socket:
            msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % (
                ctime(), self.__socket_to_user_name[cs], nickname, msg)
            self.send_to_list(json.dumps({
                'type': 'single',
                'msg': msg
            }), self.__user_name_to_socket[nickname], cs)
        else:
            self.send_to(json.dumps({
                'type': 'single',
                'msg': '該用戶不存在'
            }), cs)
        print(nickname)

    def send_to_list(self, msg, *cs):
        for i in range(len(cs)):
            self.send_to(msg, cs[i])

    def get_all_login_user_info(self):
        login_list = "[ SYSTEM ] ALIVE USER : \r\n"
        for key in self.__socket_to_user_name:
            login_list += self.__socket_to_user_name[key] + ",\r\n"
        return login_list

    def send_to(self, msg, cs):
        if cs not in self.__socket_list:
            self.__socket_list.append(cs)
        cs.sendall(bytes(msg, 'utf-8'))

    def broadcast_system_msg(self, msg):
        data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg)
        js = json.dumps({
            'type': 'system_msg',
            'msg': data
        })
        # 屏蔽了群聊的玩家也可以獲得系統的群發信息
        for i in range(len(self.__socket_list)):
            if self.__socket_list[i] in self.__socket_to_user_name:
                self.__socket_list[i].sendall(bytes(js, 'utf-8'))

    def broadcast(self, msg, cs):
        data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg)
        js = json.dumps({
            'type': 'broadcast',
            'msg': data
        })
        # 沒有的登陸的玩家無法得知消息,屏蔽了群聊的玩家也沒辦法獲取信息
        for i in range(len(self.__socket_list)):
            if self.__socket_list[i] in self.__socket_to_user_name \
                    and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:
                self.__socket_list[i].sendall(bytes(js, 'utf-8'))


def main():
    server = PyChattingServer()
    server.start_session()


main()

客戶端的實現

怎么用Python編寫簡單的聊天程序

怎么用Python編寫簡單的聊天程序

import json
import threading
from socket import *

is_login = False
is_broadcast = True


class ClientReceiveThread(threading.Thread):
    __buf = 1024

    def __init__(self, cs):
        super(ClientReceiveThread, self).__init__()
        self.__cs = cs

    def run(self):
        self.receive_msg()

    def receive_msg(self):
        while True:
            msg = self.__cs.recv(self.__buf).decode('utf-8')
            if not msg:
                break
            js = json.loads(msg)
            if js['type'] == "login":
                if js['success']:
                    global is_login
                    is_login = True
                print(js['msg'])
            elif js['type'] == "ignore":
                if js['success']:
                    global is_broadcast
                    if is_broadcast:
                        is_broadcast = False
                    else:
                        is_broadcast = True
                print(js['msg'])
            else:
                if not is_broadcast:
                    print("[現在處于屏蔽模式]")
                print(js['msg'])


class ClientSendMsgThread(threading.Thread):

    def __init__(self, cs):
        super(ClientSendMsgThread, self).__init__()
        self.__cs = cs

    def run(self):
        self.send_msg()

    # 根據不同的輸入格式來進行不同的聊天方式
    def send_msg(self):
        while True:
            js = None
            msg = input()
            if not is_login:
                js = json.dumps({
                    'type': 'login',
                    'msg': msg
                })
            elif msg[0] == "@":
                data = msg.split(' ')
                if not data:
                    print("請重新輸入")
                    break
                nickname = data[0]
                nickname = nickname.strip("@")
                if len(data) == 1:
                    data.append(" ")
                js = json.dumps({
                    'type': 'sendto',
                    'nickname': nickname,
                    'msg': data[1]
                })
            elif msg == "/help":
                js = json.dumps({
                    'type': 'help',
                    'msg': None
                })
            elif msg == "/ls":
                js = json.dumps({
                    'type': 'ls',
                    'msg': None
                })
            elif msg == "/i":
                js = json.dumps({
                    'type': 'ignore',
                    'msg': None
                })
            else:
                if msg[0] != '/':
                    js = json.dumps({
                        'type': 'broadcast',
                        'msg': msg
                    })
            if js is not None:
                self.__cs.sendall(bytes(js, 'utf-8'))


def main():
    buf = 1024
    # 改變這個的地址,變成服務器的地址,那么只要部署到服務器上就可以全網使用了
    address = ("127.0.0.1", 12231)
    cs = socket(AF_INET, SOCK_STREAM, 0)
    cs.connect(address)
    data = cs.recv(buf).decode("utf-8")
    if data:
        print(data)
    receive_thread = ClientReceiveThread(cs)
    receive_thread.start()
    send_thread = ClientSendMsgThread(cs)
    send_thread.start()
    while True:
        pass


main()

感謝各位的閱讀,以上就是“怎么用Python編寫簡單的聊天程序”的內容了,經過本文的學習后,相信大家對怎么用Python編寫簡單的聊天程序這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

德州市| 张掖市| 永和县| 瓮安县| 禹城市| 鄢陵县| 鄂伦春自治旗| 沅江市| 厦门市| 花莲县| 米易县| 新民市| 交口县| 元阳县| 洞口县| 宝坻区| 甘谷县| 明星| 饶阳县| 利川市| 延长县| 开原市| 南京市| 高密市| 萨迦县| 南充市| 隆安县| 阳春市| 邓州市| 白沙| 舞阳县| 营口市| 循化| 米脂县| 安庆市| 长阳| 新余市| 屏东市| 兴仁县| 河西区| 项城市|