三天速成Python之Day3——多线程、网络编程
0xFF 前言
- 在前面语法的学习之上,我们来初识多线程和网络。
- 本文将描述多线程,线程锁,队列,套接字的应用。
- 本文将课堂笔记滞后,自写代码放在前面方便阅读。
- 本文笔记见:语雀
- 扩展阅读:
- 今天Python之父退休太无聊,宣布复出并加盟微软??
Ox10 课外补充
Ox11 单线程
- 调用两次函数,函数输出函数名后睡眠1秒
from time import sleep
import time
def f1():
print('f1', time.time())
sleep(1)
pass
def f2():
print('f2', time.time())
sleep(1)
pass
if __name__ == '__main__':
f1()
f2()
Ox12 多线程操作
- 单线程调用函数时,会等待睡眠后才能继续
- 由于多种场景的需要,我们需要让他们一起运行
- 这里我们来创建
# -*- coding: utf-8 -*-
# -*- coder : CO0kie丶 -*-
# -*- time : 20201113 -*-
from time import sleep
import time
import threading
def f1():
print('f1', time.time())
sleep(1)
pass
def f2():
print('f2', time.time())
sleep(1)
pass
if __name__ == '__main__':
gThread = [threading.Thread(target=f1), threading.Thread(target=f2)]
for th in gThread:
th.start()
for th in gThread:
if th.is_alive():
th.join()
pass
Ox13 多线程传参
- 有了多线程之后,那参数该怎么传递呢?
- 这时候我们可以运用元组的打包和解包,然后传递给函数
# -*- coding: utf-8 -*-
# -*- coder : CO0kie丶 -*-
# -*- time : 20201113 -*-
from time import sleep
import threading
def f1(thid, imax):
print('>线程传入的参数', thid, imax)
for i in range(imax):
print(f'>>线程{thid}', f'计数{i + 1}/{imax}')
sleep(0.01)
print('>线程%d 退出' % thid)
pass
def f2(i):
__th = threading.Thread(target=f1, args=(i + 1, 5))
__th.start()
return __th
if __name__ == '__main__':
gThreadN = 2
gThreads = list(map(f2, range(gThreadN)))
print(gThreads)
for th in gThreads:
if th.is_alive():
th.join()
pass
- 运行结果
Ox14 多线程的锁
- 运行上面的线程我们发现打印乱了
- 这时候我们就需要让线程同步一下
- 以下代码展示一下最简单的线程锁
- 除此以外还有很多高级锁 参考文尾
# -*- coding: utf-8 -*-
# -*- coder : CO0kie丶 -*-
# -*- time : 20201113 -*-
from time import sleep
import threading
def f1(lock, thid, imax):
lock.acquire()
print('>线程传入的参数', thid, imax)
lock.release()
for i in range(imax):
lock.acquire()
print(f'>>线程{thid} 计数{i + 1}/{imax}')
lock.release()
sleep(0.01)
lock.acquire()
print('>线程%d 退出' % thid)
lock.release()
pass
def f2(i):
global gThLock
__th = threading.Thread(target=f1, args=(gThLock, i + 1, 5))
__th.start()
return __th
if __name__ == '__main__':
gThLock = threading.Lock()
gThreadN = 2
gThreads = list(map(f2, range(gThreadN)))
for th in gThreads:
if th.is_alive():
th.join()
pass
- 运行结果
>线程传入的参数 1 5
>>线程1 计数1/5
>线程传入的参数 2 5
>>线程2 计数1/5
>>线程2 计数2/5
>>线程1 计数2/5
>>线程2 计数3/5
>>线程1 计数3/5
>>线程2 计数4/5
>>线程1 计数4/5
>>线程1 计数5/5
>>线程2 计数5/5
>线程2 退出
>线程1 退出
Ox15 多线程封装
- 上面的多线程锁还是面向函数式编程的
- 让我们来封装一下,改成面向对象的吧
# -*- coding: utf-8 -*-
# -*- coder : CO0kie丶 -*-
# -*- time : 20201113 -*-
import threading
from time import sleep
class MyThread(threading.Thread):
def __init__(self, lock, thid):
threading.Thread.__init__(self)
self.lock = lock
self.name = f'线程{thid}'
self.start()
def print(self, text: str):
self.lock.acquire()
print(f'>>{self.name}:{text}')
self.lock.release()
def run(self):
global num
self.print(f'传入 {self.name} {num}')
for i in range(num):
self.print(f'计数{i + 1}/{num}')
sleep(0.01)
self.print('退出')
class ThreadCtrl(object):
def funinit(self, thid):
return MyThread(self.lock, thid + 1)
def __init__(self, threadNum: int):
self.lock = threading.Lock()
self.Threads = list(map(self.funinit, range(threadNum)))
pass
if __name__ == '__main__':
num = 5
ThreadCtrl(2)
- 运行结果
>>线程1:传入 线程1 5
>>线程1:计数1/5
>>线程2:传入 线程2 5
>>线程2:计数1/5
>>线程1:计数2/5
>>线程2:计数2/5
>>线程1:计数3/5
>>线程2:计数3/5
>>线程1:计数4/5
>>线程2:计数4/5
>>线程2:计数5/5
>>线程1:计数5/5
>>线程1:退出
>>线程2:退出
Ox16 多线程队列
- 多线程已经封装成类了
- 那我们把队列给加进去
# -*- coding: utf-8 -*-
# -*- coder : CO0kie丶 -*-
# -*- time : 20201113 -*-
import threading
import queue
from time import sleep
from functools import reduce
class MyThread(threading.Thread):
def __init__(self, _lock, _thid, _que):
threading.Thread.__init__(self)
self.lock = _lock
self.thid = _thid
self.que = _que
self.max = _que.qsize()
def print(self, text: str):
self.lock.acquire()
print(f'>>{self.name}:{text}')
self.lock.release()
def run(self):
self.print(f'传入 {self.thid};队列数 {self.max}')
sleep(0.01)
while not self.que.empty():
task = self.que.get()
self.print(f'计数{task}/{self.max}')
sleep(0.01)
self.print('退出')
class ThreadCtrl(object):
def funinit(self, thid):
return MyThread(self.lock, thid + 1, self.que)
def __init__(self, _threadNum, _que):
self.que = _que
self.lock = threading.Lock()
self.Threads = list(map(self.funinit, range(_threadNum)))
def start(self, _bJoin=True):
# 循环启动线程
for th in self.Threads:
th.start()
# 循环等待线程返回
if not _bJoin:
return
for th in self.Threads:
if th.is_alive():
th.join()
return
if __name__ == '__main__':
# 初始化队列
que = queue.Queue()
# 循环加入队列
for i in range(10):
que.put(i + 1)
# 初始化线程,并启动
ThreadCtrl(2, que).start(True)
print('>主线程结束')
pass
- 运行结果
>>Thread-1:传入 1;队列数 10
>>Thread-2:传入 2;队列数 10
>>Thread-2:计数1/10
>>Thread-1:计数2/10
>>Thread-2:计数3/10
>>Thread-1:计数4/10
>>Thread-1:计数5/10
>>Thread-2:计数6/10
>>Thread-1:计数7/10
>>Thread-2:计数8/10
>>Thread-1:计数9/10
>>Thread-2:计数10/10
>>Thread-1:退出
>>Thread-2:退出
>主线程结束
Process finished with exit code 0
Ox17 线程池应用
'''挖个坑,待更新'''
Ox00 课堂笔记
Ox01 文件操作
# 使用 Python 内置函数完成文件操作
# 参数一是文件所在的路径,参数二是打开方式,可能需要注意 encoding
file = open('content.txt', 'w+')
# 向文件的内部写入数据
file.write('hello 15pb')
file.writelines(['第一行\n', '第二行\n', '第三行\n'])
# 关闭文件,将对文件的修改保存到硬盘
file.close()
# 通过 with open() as name 的形式打开一个文件
with open('content.txt', 'r') as file:
# 将 open 函数的返回值保存到变量 file 中
# 对于文件对象,在离开 with 作用域的时候
# 会自动的进行关闭,只需要关注逻辑部分
print(file.read()) # 默认读取所有内容
print(file.readline()) # 一行最大字符个数
print(file.readlines()) # 读取的最多行数
# 以只读方式打开文件的时候,如果文件不存在就会产生异常
try:
with open('content2.txt') as file:
pass
except Exception as e:
print(e)
# 更多的函数: file.seek + file.tell -> 计算文件大小
# 文件操作模块: os,os 的使用方式和 C 语言的函数完全一致
Ox02 结构体操作
# 通过 struct 可以实现字节流到结构体的转换
import struct
# 打包成 int + int + char[1024] 的结构体
bytes_content = struct.pack('ii1024s', 10, 20, b'hello15pb')
print(bytes_content)
# 将 nt + int + char[1024] 的结构体进行解包
type, length, content = struct.unpack('ii1024s', bytes_content)
print(type, length, content.decode('utf8').strip('\0'))
Ox03 线程模块
# 使用 threading 可以实现线程操作(伪线程)
import threading
import time
# 不带参数的线程回调函数
def worker_thread():
for i in range(1000):
# current_thread 表示的始终都是当前的线程
print(threading.current_thread().name, i)
time.sleep(0.1)
# 模拟主函数的使用
def main():
# 通过 Thread 函数创建一个线程,需要指定起始位置(函数)
t = threading.Thread(target=worker_thread, name='worker_thread')
# Thread 创建的实际是线程对象,默认并没有运行,需要调用 start 函数
t.start()
for i in range(1000):
# current_thread 表示的始终都是当前的线程
print(threading.current_thread().name, i)
time.sleep(0.1)
# 通常,为了确保主线程退出之前,所有其它线程执行完毕,需要等待
t.join()
if __name__ == '__main__':
main()
Ox04 线程传参
import threading
# 线程回调函数,带参
def thread1(arg1, arg2, arg3):
print(arg1, type(arg1))
print(arg2, type(arg2))
print(arg3, type(arg3))
# 线程回调函数,带元组变参
def thread2(*args):
print(args, type(args))
# 线程回调函数,带字典变参
def thread3(**kwargs):
print(kwargs, type(kwargs))
# 线程回调函数,带元组和字典变参
def thread4(*args, **kwargs):
print(args, type(args))
print(kwargs, type(kwargs))
# 创建多个线程,传递相应的参数
threading.Thread(target=thread2, args=(1, 2, 3)).start()
threading.Thread(target=thread3, kwargs={"a": 1, "b": 2, "c": 3}).start()
threading.Thread(target=thread4, args=(1, 2, 3), kwargs={"a": 1, "b": 2, "c": 3}).start()
threading.Thread(target=thread1, args=(1, 2, 3)).start()
Ox05 线程的锁
# 使用 threading 可以实现线程操作(伪线程)
import threading
import time
# 定义一个全局的锁
lock = threading.Lock()
# 不带参数的线程回调函数
def worker_thread():
for i in range(1000):
lock.acquire()
# current_thread 表示的始终都是当前的线程
print(threading.current_thread().name, i)
time.sleep(0.1)
lock.release()
# 模拟主函数的使用
def main():
# 通过 Thread 函数创建一个线程,需要指定起始位置(函数)
t = threading.Thread(target=worker_thread, name='worker_thread')
# Thread 创建的实际是线程对象,默认并没有运行,需要调用 start 函数
t.start()
for i in range(1000):
lock.acquire()
# current_thread 表示的始终都是当前的线程
print(threading.current_thread().name, i)
time.sleep(0.1)
lock.release()
# 通常,为了确保主线程退出之前,所有其它线程执行完毕,需要等待
t.join()
if __name__ == '__main__':
main()
Ox06 数据库操作
# 通过 pymysql 连接并操作 mysql 数据库
import pymysql
class Mysql(object):
def __init__(self, database_name):
try:
# 通过 connect 函数传入数据库的配置信息连接到数据库
self.connect = pymysql.connect(host='127.0.0.1', user='root',
password='123456', port=3306, database=database_name)
# 一旦数据库连接成功,我们就需要获取到游标对象
self.cursor = self.connect.cursor()
except Exception as e:
print('error', e)
def insert(self, sql: str):
try:
self.cursor.execute(sql)
# 对于所有修改数据库的操作,都需要提交
self.connect.commit()
except Exception as e:
self.connect.rollback()
print('error', e)
def select(self, sql: str):
try:
self.cursor.execute(sql)
# 从数据库中获取到查询的结果集,返回的是一个元组,
# 元组中的每一个元素表示一行,保存的是一行中的每一列
result = self.cursor.fetchall()
count = self.cursor.rowcount
# 将查询到的行数和内容进行打包,返回给调用方
return count, result
except Exception as e:
print('error', e)
if __name__ == '__main__':
sql = Mysql('student')
sql.insert("INSERT INTO stu_class VALUE(10, 'ten')")
print(sql.select('select * from stu_class;'))
Ox07 客户端程序
from socket import *
def main():
# 1. 创建套接字对象
client = socket(AF_INET, SOCK_STREAM)
# 2. 等待客户端的连接
client.connect(('127.0.0.1', 0x1515))
# 3. 收发数据
print(client.recv(100).decode('utf-8'))
# 6. 关闭套接字
client.close()
if __name__ == '__main__':
main()
Ox08 服务端操作
from socket import *
def main():
# 1. 创建套接字对象
server = socket(AF_INET, SOCK_STREAM)
# 2. 绑定对象到指定的ip和端口
server.bind(('127.0.0.1', 0x1515))
# 3. 开启套接字的监听状态
server.listen(SOMAXCONN)
# 4. 等待客户端的连接
client, address = server.accept()
# 5. 收发数据
client.send('welcome'.encode('utf-8'))
# 6. 关闭套接字
client.close()
server.close()
if __name__ == '__main__':
main()
Ox09 聊天室
'''挖个坑,待更新'''
Ox20 引用
【Python】 多线程并发threading & 任务队列Queue - K.Takanashi - 博客园
https://www.cnblogs.com/franknihao/p/6627857.html
python--threading多线程总结 - 苍松 - 博客园
https://www.cnblogs.com/tkqasn/p/5700281.html
Python多线程threading进阶笔记 - 刘元涛的个人页面 - OSCHINA
https://my.oschina.net/liuyuantao/blog/1154194
python多线程、锁、event事件机制的简单使用 - 简书
https://www.jianshu.com/p/8301f1083d5e
细说Python的lambda函数用法,建议收藏 - 知乎
- END
