三天速成Python之Day3——多线程、网络编程

0xFF 前言

  • 在前面语法的学习之上,我们来初识多线程和网络。
  • 本文将描述多线程,线程锁,队列,套接字的应用。
  • 本文将课堂笔记滞后,自写代码放在前面方便阅读。
  • 本文笔记见:语雀
  • 扩展阅读:
  • 今天Python之父退休太无聊,宣布复出并加盟微软??

Ox10 课外补充

Ox11 单线程

  • 调用两次函数,函数输出函数名后睡眠1秒
from time import sleep
import time


def f1():
    print('f1', time.time())
    sleep(1)
    pass


def f2():
    print('f2', time.time())
    sleep(1)
    pass


if __name__ == '__main__':
    f1()
    f2()

Ox12 多线程操作

  • 单线程调用函数时,会等待睡眠后才能继续
  • 由于多种场景的需要,我们需要让他们一起运行
  • 这里我们来创建
# -*- coding: utf-8 -*-
# -*- coder : CO0kie丶 -*-
# -*- time  : 20201113 -*-
from time import sleep
import time
import threading


def f1():
    print('f1', time.time())
    sleep(1)
    pass


def f2():
    print('f2', time.time())
    sleep(1)
    pass


if __name__ == '__main__':
    gThread = [threading.Thread(target=f1), threading.Thread(target=f2)]
    for th in gThread:
        th.start()
    for th in gThread:
        if th.is_alive():
            th.join()
    pass

Ox13 多线程传参

  • 有了多线程之后,那参数该怎么传递呢?
  • 这时候我们可以运用元组的打包和解包,然后传递给函数
# -*- coding: utf-8 -*-
# -*- coder : CO0kie丶 -*-
# -*- time  : 20201113 -*-
from time import sleep
import threading


def f1(thid, imax):
    print('>线程传入的参数', thid, imax)
    for i in range(imax):
        print(f'>>线程{thid}', f'计数{i + 1}/{imax}')
        sleep(0.01)
    print('>线程%d 退出' % thid)
    pass


def f2(i):
    __th = threading.Thread(target=f1, args=(i + 1, 5))
    __th.start()
    return __th


if __name__ == '__main__':
    gThreadN = 2
    gThreads = list(map(f2, range(gThreadN)))
    print(gThreads)
    for th in gThreads:
        if th.is_alive():
            th.join()
    pass
  • 运行结果

image.png


Ox14 多线程的锁

  • 运行上面的线程我们发现打印乱了
  • 这时候我们就需要让线程同步一下
  • 以下代码展示一下最简单的线程锁
  • 除此以外还有很多高级锁 参考文尾
# -*- coding: utf-8 -*-
# -*- coder : CO0kie丶 -*-
# -*- time  : 20201113 -*-
from time import sleep
import threading


def f1(lock, thid, imax):
    lock.acquire()
    print('>线程传入的参数', thid, imax)
    lock.release()
    for i in range(imax):
        lock.acquire()
        print(f'>>线程{thid} 计数{i + 1}/{imax}')
        lock.release()
        sleep(0.01)
    lock.acquire()
    print('>线程%d 退出' % thid)
    lock.release()
    pass


def f2(i):
    global gThLock
    __th = threading.Thread(target=f1, args=(gThLock, i + 1, 5))
    __th.start()
    return __th


if __name__ == '__main__':
    gThLock = threading.Lock()
    gThreadN = 2
    gThreads = list(map(f2, range(gThreadN)))
    for th in gThreads:
        if th.is_alive():
            th.join()
    pass
  • 运行结果
>线程传入的参数 1 5
>>线程1 计数1/5
>线程传入的参数 2 5
>>线程2 计数1/5
>>线程2 计数2/5
>>线程1 计数2/5
>>线程2 计数3/5
>>线程1 计数3/5
>>线程2 计数4/5
>>线程1 计数4/5
>>线程1 计数5/5
>>线程2 计数5/5
>线程2 退出
>线程1 退出

Ox15 多线程封装

  • 上面的多线程锁还是面向函数式编程的
  • 让我们来封装一下,改成面向对象的吧
# -*- coding: utf-8 -*-
# -*- coder : CO0kie丶 -*-
# -*- time  : 20201113 -*-
import threading
from time import sleep


class MyThread(threading.Thread):
    def __init__(self, lock, thid):
        threading.Thread.__init__(self)
        self.lock = lock
        self.name = f'线程{thid}'
        self.start()

    def print(self, text: str):
        self.lock.acquire()
        print(f'>>{self.name}:{text}')
        self.lock.release()

    def run(self):
        global num
        self.print(f'传入 {self.name} {num}')
        for i in range(num):
            self.print(f'计数{i + 1}/{num}')
            sleep(0.01)
        self.print('退出')


class ThreadCtrl(object):
    def funinit(self, thid):
        return MyThread(self.lock, thid + 1)

    def __init__(self, threadNum: int):
        self.lock = threading.Lock()
        self.Threads = list(map(self.funinit, range(threadNum)))
        pass


if __name__ == '__main__':
    num = 5
    ThreadCtrl(2)
  • 运行结果
>>线程1:传入 线程1 5
>>线程1:计数1/5
>>线程2:传入 线程2 5
>>线程2:计数1/5
>>线程1:计数2/5
>>线程2:计数2/5
>>线程1:计数3/5
>>线程2:计数3/5
>>线程1:计数4/5
>>线程2:计数4/5
>>线程2:计数5/5
>>线程1:计数5/5
>>线程1:退出
>>线程2:退出

Ox16 多线程队列

  • 多线程已经封装成类了
  • 那我们把队列给加进去
# -*- coding: utf-8 -*-
# -*- coder : CO0kie丶 -*-
# -*- time  : 20201113 -*-
import threading
import queue
from time import sleep
from functools import reduce


class MyThread(threading.Thread):
    def __init__(self, _lock, _thid, _que):
        threading.Thread.__init__(self)
        self.lock = _lock
        self.thid = _thid
        self.que = _que
        self.max = _que.qsize()

    def print(self, text: str):
        self.lock.acquire()
        print(f'>>{self.name}:{text}')
        self.lock.release()

    def run(self):
        self.print(f'传入 {self.thid};队列数 {self.max}')
        sleep(0.01)
        while not self.que.empty():
            task = self.que.get()
            self.print(f'计数{task}/{self.max}')
            sleep(0.01)
        self.print('退出')


class ThreadCtrl(object):
    def funinit(self, thid):
        return MyThread(self.lock, thid + 1, self.que)

    def __init__(self, _threadNum, _que):
        self.que = _que
        self.lock = threading.Lock()
        self.Threads = list(map(self.funinit, range(_threadNum)))

    def start(self, _bJoin=True):
        # 循环启动线程
        for th in self.Threads:
            th.start()

        # 循环等待线程返回
        if not _bJoin:
            return
        for th in self.Threads:
            if th.is_alive():
                th.join()
        return


if __name__ == '__main__':
    # 初始化队列
    que = queue.Queue()
    # 循环加入队列
    for i in range(10):
        que.put(i + 1)
    # 初始化线程,并启动
    ThreadCtrl(2, que).start(True)
    print('>主线程结束')
    pass
  • 运行结果
>>Thread-1:传入 1;队列数 10
>>Thread-2:传入 2;队列数 10
>>Thread-2:计数1/10
>>Thread-1:计数2/10
>>Thread-2:计数3/10
>>Thread-1:计数4/10
>>Thread-1:计数5/10
>>Thread-2:计数6/10
>>Thread-1:计数7/10
>>Thread-2:计数8/10
>>Thread-1:计数9/10
>>Thread-2:计数10/10
>>Thread-1:退出
>>Thread-2:退出
>主线程结束

Process finished with exit code 0

Ox17 线程池应用

'''挖个坑,待更新'''

Ox00 课堂笔记

Ox01 文件操作

# 使用 Python 内置函数完成文件操作

# 参数一是文件所在的路径,参数二是打开方式,可能需要注意 encoding
file = open('content.txt', 'w+')

# 向文件的内部写入数据
file.write('hello 15pb')
file.writelines(['第一行\n', '第二行\n', '第三行\n'])

# 关闭文件,将对文件的修改保存到硬盘
file.close()

# 通过 with open() as name 的形式打开一个文件
with open('content.txt', 'r') as file:
    # 将 open 函数的返回值保存到变量 file 中
    # 对于文件对象,在离开 with 作用域的时候
    # 会自动的进行关闭,只需要关注逻辑部分
    print(file.read())          # 默认读取所有内容
    print(file.readline())      # 一行最大字符个数
    print(file.readlines())     # 读取的最多行数

# 以只读方式打开文件的时候,如果文件不存在就会产生异常
try:
    with open('content2.txt') as file:
        pass
except Exception as e:
    print(e)

# 更多的函数: file.seek + file.tell -> 计算文件大小

# 文件操作模块: os,os 的使用方式和 C 语言的函数完全一致

Ox02 结构体操作

# 通过 struct 可以实现字节流到结构体的转换
import struct

# 打包成 int + int + char[1024] 的结构体
bytes_content = struct.pack('ii1024s', 10, 20, b'hello15pb')
print(bytes_content)

# 将 nt + int + char[1024] 的结构体进行解包
type, length, content = struct.unpack('ii1024s', bytes_content)
print(type, length, content.decode('utf8').strip('\0'))

Ox03 线程模块

# 使用 threading 可以实现线程操作(伪线程)
import threading
import time

# 不带参数的线程回调函数
def worker_thread():
    for i in range(1000):
        # current_thread 表示的始终都是当前的线程
        print(threading.current_thread().name, i)
        time.sleep(0.1)

# 模拟主函数的使用
def main():
    # 通过 Thread 函数创建一个线程,需要指定起始位置(函数)
    t = threading.Thread(target=worker_thread, name='worker_thread')
    # Thread 创建的实际是线程对象,默认并没有运行,需要调用 start 函数
    t.start()

    for i in range(1000):
        # current_thread 表示的始终都是当前的线程
        print(threading.current_thread().name, i)
        time.sleep(0.1)

    # 通常,为了确保主线程退出之前,所有其它线程执行完毕,需要等待
    t.join()

if __name__ == '__main__':
    main()

Ox04 线程传参

import threading


# 线程回调函数,带参
def thread1(arg1, arg2, arg3):
    print(arg1, type(arg1))
    print(arg2, type(arg2))
    print(arg3, type(arg3))


# 线程回调函数,带元组变参
def thread2(*args):
    print(args, type(args))


# 线程回调函数,带字典变参
def thread3(**kwargs):
    print(kwargs, type(kwargs))


# 线程回调函数,带元组和字典变参
def thread4(*args, **kwargs):
    print(args, type(args))
    print(kwargs, type(kwargs))


# 创建多个线程,传递相应的参数
threading.Thread(target=thread2, args=(1, 2, 3)).start()
threading.Thread(target=thread3, kwargs={"a": 1, "b": 2, "c": 3}).start()
threading.Thread(target=thread4, args=(1, 2, 3), kwargs={"a": 1, "b": 2, "c": 3}).start()
threading.Thread(target=thread1, args=(1, 2, 3)).start()

Ox05 线程的锁

# 使用 threading 可以实现线程操作(伪线程)
import threading
import time


# 定义一个全局的锁
lock = threading.Lock()


# 不带参数的线程回调函数
def worker_thread():
    for i in range(1000):
        lock.acquire()
        # current_thread 表示的始终都是当前的线程
        print(threading.current_thread().name, i)
        time.sleep(0.1)
        lock.release()


# 模拟主函数的使用
def main():
    # 通过 Thread 函数创建一个线程,需要指定起始位置(函数)
    t = threading.Thread(target=worker_thread, name='worker_thread')
    # Thread 创建的实际是线程对象,默认并没有运行,需要调用 start 函数
    t.start()

    for i in range(1000):
        lock.acquire()
        # current_thread 表示的始终都是当前的线程
        print(threading.current_thread().name, i)
        time.sleep(0.1)
        lock.release()

    # 通常,为了确保主线程退出之前,所有其它线程执行完毕,需要等待
    t.join()


if __name__ == '__main__':
    main()

Ox06 数据库操作

# 通过 pymysql 连接并操作 mysql 数据库
import pymysql


class Mysql(object):

    def __init__(self, database_name):
        try:
            # 通过 connect 函数传入数据库的配置信息连接到数据库
            self.connect = pymysql.connect(host='127.0.0.1', user='root',
                password='123456', port=3306, database=database_name)
            # 一旦数据库连接成功,我们就需要获取到游标对象
            self.cursor = self.connect.cursor()
        except Exception as e:
            print('error', e)

    def insert(self, sql: str):
        try:
            self.cursor.execute(sql)
            # 对于所有修改数据库的操作,都需要提交
            self.connect.commit()
        except Exception as e:
            self.connect.rollback()
            print('error', e)

    def select(self, sql: str):
        try:
            self.cursor.execute(sql)
            # 从数据库中获取到查询的结果集,返回的是一个元组,
            # 元组中的每一个元素表示一行,保存的是一行中的每一列
            result = self.cursor.fetchall()
            count = self.cursor.rowcount
            # 将查询到的行数和内容进行打包,返回给调用方
            return count, result
        except Exception as e:
            print('error', e)


if __name__ == '__main__':
    sql = Mysql('student')
    sql.insert("INSERT INTO stu_class VALUE(10, 'ten')")
    print(sql.select('select * from stu_class;'))

Ox07 客户端程序

from socket import *


def main():
    # 1. 创建套接字对象
    client = socket(AF_INET, SOCK_STREAM)

    # 2. 等待客户端的连接
    client.connect(('127.0.0.1', 0x1515))

    # 3. 收发数据
    print(client.recv(100).decode('utf-8'))

    # 6. 关闭套接字
    client.close()


if __name__ == '__main__':
    main()

Ox08 服务端操作

from socket import *


def main():
    # 1. 创建套接字对象
    server = socket(AF_INET, SOCK_STREAM)

    # 2. 绑定对象到指定的ip和端口
    server.bind(('127.0.0.1', 0x1515))

    # 3. 开启套接字的监听状态
    server.listen(SOMAXCONN)

    # 4. 等待客户端的连接
    client, address = server.accept()

    # 5. 收发数据
    client.send('welcome'.encode('utf-8'))

    # 6. 关闭套接字
    client.close()
    server.close()


if __name__ == '__main__':
    main()

Ox09 聊天室

'''挖个坑,待更新'''

Ox20 引用

【Python】 多线程并发threading & 任务队列Queue - K.Takanashi - 博客园

https://www.cnblogs.com/franknihao/p/6627857.html

python--threading多线程总结 - 苍松 - 博客园

https://www.cnblogs.com/tkqasn/p/5700281.html

Python多线程threading进阶笔记 - 刘元涛的个人页面 - OSCHINA

https://my.oschina.net/liuyuantao/blog/1154194

python多线程、锁、event事件机制的简单使用 - 简书

https://www.jianshu.com/p/8301f1083d5e

细说Python的lambda函数用法,建议收藏 - 知乎

https://zhuanlan.zhihu.com/p/80960485


  • END