Obelieve's Blog  >  All tags  >  Python

Celery 分布式任务队列

经纪人(broker)🔗

需要发送消息/接收消息,通过第三方消息代理软件,来实现。

  • RabbitMQ
  • Redis
#RabbitMQ安装
#Linux系统
sudo apt-get install rabbitmq-server
#Docker
docker run -d -p 5672:5672 rabbitmq
#Redis安装
docker run -d -p 6379:6379 redis

安装🔗

pip install celery

简单例子🔗

#task.py
from celery import Celery

#broker 代理消息的软件 、backend 保存任务结果的后端
app = Celery('tasks', broker='pyamqp://guest@localhost// ',backend='rpc://')  
#rabbitMQ  pyamqp://guest@localhost// 
#redis redis://localhost:6379/0
@app.task  
def add(x, y):  
    return x + y

运行工作服务器🔗

celery -A task worker --loglevel=INFO 

调用任务🔗

  • 使用delay调用任务
from tasks import add
add.delay(4, 4)

保存任务🔗

  • Celery 内置了几个结果后端可供选择: SQLAlchemy / Django ORM, MongoDB 、 Memcached 、 Redis 、 RPC ( RabbitMQ /AMQP)以及 – 或者您可以定义自己的。
#RPC 结果后端 
#backend='rpc://'

from tasks import add
result = add.delay(4,4)
result.ready() #判断结果是否完成
result.get(timeout=1) #获取结果并设置超时1s
result.get() #获取结果 
# result.get(propagate=False) 任务异常时get()也会异常,propagate覆盖操作
  • error
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'. Did you mean: 'get_task_meta_for'?
# 解决 重启调用任务的Python

配置🔗

# 单个配置
app.conf.task_serializer = 'json'
# update 多个配置
app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)
# 配置模块
app.config_from_object('celeryconfig')
celeryconfig.py
###
broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
###
python -m celeryconfig # 验证配置模块是否正确

Python语法

[TOC]

注:🔗

  • 跨行字符串使用'''表示
  • callable(object)可调用对象判断函数,类是可调用对象,实例需要实现__call__方法
  • if条件句变式
    • <A> if <B> else <C> 默认返回A,如果B为False返回C
    • if <A> is not <B>: 字面意思A不是B,为True
    • if not <A>: A是空(False),为True
  • 一个类中,相同方法名,后面方法覆盖前面方法
  • 魔法方法
    • __setattr__ 拦截赋值 重写该方法,当实例对属性进行赋值时,会调用该方法
    • __getattr__ 拦截运算 重写该方法,当实例直接引用不能存在的属性时,会调用该方法
  • // :做除法并返回不大于结果的整数
  • [cls(**r) for r in rs],cls表示类接收一个关键字参数,rs表示迭代器。 通过迭代器把关键字参数传入到cls中,然后返回一个list对象

1.执行模式🔗

1.交互式模式(直接输入,并解释)🔗

>>>print("hello world")

2.命令行模式🔗

  • 2.1内置函数:

    • I/O input() print()

    • Python3.x使用Unicode编码: ord() #字符转为整数表示 chr() #整数转为字符表示 PS:ord('A') 输出65 chr(666) 输出'ʚ'

    • 编码/解码 "ascii" 是bytes(字节方式,b前缀),在bytes中,无法显示为ASCII字符的字节,用\x##显示。 "utf-8" encode() #用什么编码, decode() #用什么解码 PS: ‘A’.encode('ascii')输出b'A'

    • 其他 len() #字符/字节长度 format() #格式化输出 help(<func_name>) #查看函数用法

    • 重新加载模块

    import importlib
    importlib.reload(<module_name>)
    

2.语法-关键字🔗

2.1变量类型任意-动态语言,变量类型固定-静态语言🔗

  • 1.语句缩进风格,大小写区分
  • 2.数据类型: 浮点型:"2e5"表示 2*10的5次方,"2e-5" 表示2*10的-5次方 布尔型:"True" 或 "False" 逻辑符号: "and"(与)、"or"(或)、"not"(非) 空值:"None"
  • 3.除法:"//"表示除法最后结果只取整数部分

2.2格式化输出🔗

  • 1.使用 “%” PS: print("Hello,%s"%'World',"ok%d"%3) 输出 Hello,World ok3
  • 2.使用format(),使用占位符{0},{1}... PS:print("{0},{1}".format('Hello','World'))

2.3 list(可变数组 <中括号> [])和tuple(不可变数组 <小括号> ())🔗

  • list相关
    • 1.list定义:
    num=[1,2,3]
    num=[] (空数组)
    
    • 2.访问list:
    num[0]输出 1
    num[1]输出 2
    num[-1]输出 3
    num[-2]输出 2
    num[-3]输出 1
    num[-4]报错
    
    • 3.增加list元素:
    num.append("a") 输出 [1,2,3,'a']
    num.insert(1,1.1) 输出 [1,1.1,2,3,'a']
    num.insert(-1,'last_1') 输出[1,1.1,2,3,'last_1','a']
    
    • 4.删除list元素:
    num.pop()输出[1,1.1,2,3,'last_1']
    num.pop(1)输出[1,2,3,'last_1']
    num.pop(-1)输出[1,2,3]
    
  • tuple相关
    • 1.tuple定义:
    #没有"增加/删除"方法,数组不变
    num=[1,2]
    finalNum=(1,2,num)
    finalNum=()(空数组)
    如果定义finalNum=(1),python防止和表达式歧义,
    规定finalNum变量表示数字1。
    可以这样表示 finalNum=(1,)
    
    • 2.访问tuple:
    finalNum[0]输出 1
    finalNum[1]输出 2
    finalNum[-1]输出 [1,2]
    finalNum[-1][0] 输出 1
    

2.4 条件判断🔗

  • 关键字 "if"、"else"、"elif"(else if 缩写)、":"(冒号)
  • 格式
    <条件判断>:非零数值、非空字符串、非空数组 为True,否则 False
    if<条件判断>: #要加冒号 ":"
    	<执行>
    elif<条件判断>:
    	<执行>
    else:
    	<执行>
    
  • 字符串不能和数字比较
    b = input() #input()返回是字符串类型,需要转为数字
    b = int(b) #int()方法,转为整数
    b>100
    

2.5 循环🔗

  • for <指代变量> in <数组>:
    range(100) #返回整数序列的range对象
    list(range(100)) #转为数组表示[1,2,...,99]
    
  • while <条件>:<缩进> <执行> break 退出循环 continue 停止当前次执行,继续循环判断

2.6 数据集合:dict 和 set🔗

  • dict <大括号> {} (dictionary) Java中的map

    d={'a':1,'b':2,"c":3}
    d['a'] 输出 1
    d['a'] ='1a' #赋值
    
    • 是否存在key,返回 True 或 False
    `<key> in <dict>`
    `<dict>.get(<key>)` #不存在返回 None
    `<dict>.get(<key>,<defaultValue>)` #不存在返回 <defaultValue>
    
    • 删除 <dict>.pop(<key>)
  • set <小括号> ()

    定义	:需要list数组
    a = [1,2,3]
    b=set(a) 或 b =set([1,2,3]) 
    
    • add <set>.add(<key>)
    • remove <set>.remove(<key>)
    • 逻辑操作 &|-
    s1=([1,2,3])
    s2=([2,3,4])
    s1 & s2 输出 {2,3}
    s1 | s2 输出 {1,2,3,4}
    s1 - s2 输出 {1}
    

2.7 函数🔗

  • 1.函数定义: def <method_name>(params):
  • 2.空函数: def <method_name>(): pass
  • 3.return:
    • 1.return表示结束,等价于 return None
    • 2.不写默认返回 return None
    • 3.返回tuple类型可省略(),PS: return (x,y) 变为 return x,y
  • 4.导入模块(.py文件):
    • 1.引入整个模块:import math 说明导入math包 PS:自定义文件tri.py,import tri引入tri文件到内存 通过 tri.<mothod>()访问
    • 2.导入模块中的某个方法:
      • 某个函数: from <文件名> import <函数名>
      • 全部函数:from <文件名> import *
  • 5.参数:
    • 必选参数
    • 1.默认参数,参数=
    • 默认参数必须指向不变对象,如果值是[],那么每次调用都是原来的[]
    def power(x,n=2):
    	pass
    调用方式:
    1.power(5),默认n=2
    2.power(5,3)
    
    • 2.可变参数: *params
    def count(*params):
    	sum = 0;
    	for p in params:
    		sum = sum +p
    	return sum
    调用方式:
    1.count(1) 输出 1
    2.count(1,2,3) 输出 6
    3.count(*[1,2,3]) 输出 6
    
    • 3.关键字参数: **params
    def student(name,age,**params):
    	print("name",name,'age',age,'params',params)
    调用:
    1.student('a',12) 输出  name a age 12 params {}
    2.student('b',12,a=122,b=1223) 输出 name b age 12 params {'a': 122, 'b': 1223}
    3.student(1,2,**{'a':2}) 输出 name 1 age 2 params {'a': 2}
    
    • 4.命令关键字参数: *,<key1>,<key2> (PS:如果前面有可变参数,可不用加*,)
    def student(name,age,*,gender,city):
    	print('name',name,'age',age,gender,city)
    调用:
    1.student('a',110) 输出 TypeError..
    2.student('a',123,gender='male',city='fz') 输出 name 1 age 2 male fz
    
    • 5.参数组合:顺序- 必选参数->默认参数->可变参数->命令关键字参数->关键字参数
    def s(a,b,c=0,*d,city,**kw):
    	print('a=',a,'b=',b,'c=',c,'d=',d,'city=',city,'kw=',kw)
    调用:
    1.s(1,2,city=22)
    输出 a= 1 b= 2 c= 0 d= () city=22 kw= {}
    2.s(1,2,3,4,5,city=22,a=1,b=2)	
    输出 a= 1 b= 2 c= 3 d= (4, 5) city= 22 kw= {'d': 1, 'e': 2}
    3.s(1,2,3,4,5) 输出TypeError:missing city
    

2.8高级特性🔗

  • 1.切片(Slice):取数组元素的简便方式
    • 默认是索引0开始并且递增速度1,可反向索引
    • [0:3][:3] 取数组 [0],[1],[2]
    • [-1:] 取数组 [2]
    • [:] 取全部
    • [::5] 在全部中,每个5个取一个
    • [:10:2] 在数组0~9的元素中每隔2个取一个
  • 2.列表生成式:列表的每个元素进行一次表达式计算
    • 格式: [<变量的表达式> for <变量> in <序列>]
    def L2(L):
    	L2 = []
    	for x in L:
    		L2.append(x*x)
    	return L2
    调用: [1,2,3] ->[1,4,9]
    1.sum2(list(range(1,4)))
    2.列表生成式: [x*x for x in range(1,4)]
    
  • 3.生成器(Generator):动态生成列表的每个元素,可通过next(<Generator>)进行访问
    • 1.列表表达式改为小括号 (<变量的表达式> for <变量> in <序列>)
    • 2.设置函数中的yield,变成一个Generator,函数每次调用返回一个新的Generator
    • g.send(<参数>):传入值,并调用next(g),g=Generator
    • return时,StopIteration
    杨辉三角: C(n,i)=C(n-1,i-1)+C(n-1,i)
    递归关系式 (n>1,1<=i<=n n表示行,i表示第n行的第i个数) C(1,1)=1
    	def triangles():
    	L = [1]
    	while True:
    		yield L
    		L = [sum(i) for i in zip([0]+L,L+[0])] #补[0]作用:行项+1,每个项是[n-1,n]队列
    

2.9 函数式编程🔗

  • 1.定义:一种抽象程度很高的程序范式,纯粹的函数式编程没有变量。任意函数输入确定,那么输出 也确定,称为没有副作用。对于有变量的相同输入,可能有不同输出,称为有副作用。

  • 2.高阶函数:

    • 特点:
      • 1.允许函数作为一个变量、函数返回值
      • 2.函数名变量:函数名作为变量
      • 3.函数参数:可接收另一个函数作为参数
    • map/reduce
      • map(<method_name>,<Iterable>) return <Iterator>,需要list(<Iterator>)变为列表
        其中<Iterable>的每个元素都被<method_name>作用一次
      • reduce(<method_name>,<Iterable>)
        • 1.需要引入 from functools import reduce
        • 2.<method_name>需要2个参数, 返回的值继续和<Iterable>的下一个值进行函数计算
    • filter:filter(<method_name>,<Iterable>)map函数类似 不同的是<method_name>根据返回True/False,<Iterable>过滤False的item
    • sorted: sorted(<Iterable>,**kw)<Iterable>进行排序 其中**kw,可选择接受 key=<method_name>(PS:abs,str.lower)自定义排序, reverse=True反向排序
  • 3.返回函数:

    • 返回函数,需要变量进行函数调用,每次调用返回新的函数
    • 闭包:能够读取函数内部变量的函数(定义在函数内部的函数) 返回函数不要引用引起变化的局部变量
  • 4.匿名函数:lambda <变量>:<表达式>只是个表达式,可被变量引用

  • 5.装饰器:在原有函数定义不需要改变下,增加新内容并返回函数的引用

    • __name__函数名称
    • 1.@ 修饰符
    def log(f):
    	@functools.wraps(f)
    	def wrapper(*args,**kw):
    		print('wrapper %s func'%f.__name__)
    		return f(*args,**kw)
    	return wrapper
    @log
    def now():
    	print('now')
    其中,@log修饰 等价于  now = log(now)
    
    • 2.原始函数的一些属性复制到装饰器函数
      • import functools
      • @functools.wraps(<func_params>)
  • 6.偏函数:functools.partial(<method_name>,*args,**kw) 其中*args在前**kw在后

    import functools
    int('100',2) 输出4
    int('100',base=2) 输出4
    int2 = functools.partial(int,base=2) #默认int函数是10进制
    int2('100')输出4
    PS:
    int2 = functools.partial(int,2)?
    int2('100') 输出 TypeError,因为调用int2('100') 等价于 int(2,'100')
    
    functools.partial源码
    def partial(func, *args, **keywords):
    	def newfunc(*fargs, **fkeywords):
    		newkeywords = keywords.copy()
            newkeywords.update(fkeywords)
            return func(*args, *fargs, **newkeywords)
        newfunc.func = func
        newfunc.args = args
        newfunc.keywords = keywords
        return newfunc
    

2.10 使用模块🔗

  • 1.导入模块: PS:import sys 就可以使用sys变量来访问模块全部内容
  • 2.模块内部:
    • 1.模块特殊变量__name__,当运行当前模块文件__name__ ==__main__是True,否则False

      hello.py
      
      def test():
      	print("hello world")
      if __name__ =='__main__'
      	test()
      当 python hello.py
      	输出 hello world 
      当 import hello 将不会执行 test()
      
    • 2.类似__xxx__这样的变量是特殊变量,可以被直接引用,但是有特殊用途

    • 3.类似_xxx和__xxx这样的变量或函数就是非公开的(private 模块内部使用),规定不能访问

    • (只是内部改变了变量命名名称,python无法完全限制访问)

  • 3.安装第三方模块:pip install <module_name>
    • 添加模块搜索路径
    import sys
    sys.path 查看模块
    sys.path.append("路径")
    
    • 设置PYTHON_PATH

2.11 面向对象编程🔗

  • 1.类和实例
    • 定义: class <class_name>(object): 表示<class_name>继承自object
    • __init__(self,params) 对象创建时,如果必须传递的参数params,可以使用该方法
    • 类的方法,第一个参数固定是self,其他和函数定义一样
  • 2.访问限制
    • __xx__标识特殊变量,可以访问
    • __xx标识变量,表示private,不允许访问
    • _xx标识变量,约定成俗,表示private (只是python解释器会把这种变量解释成不一样名称防止外部直接访问)
  • 3.继承和多态
    • class <class_name>(<继承的类>),继承父类方法和父类中__init__方法设置的变量,覆写父类方法 -鸭子类型(file-like object),只有变量对象有这个方法变量就可以,不管这个对象是否是子类
  • 4.获取对象信息
    • type(<参数>)函数:获取参数的类型。

    • import types 导入types模块 一些类型,可进行判断 types.FunctionType types.BuiltinFunctionType types.LambdaType types.GeneratorType

    • isinstance(变量,类型),可用于判断自定义的类,方法类型 isinstance(变量,(类型1,类型2)),可用于判断是否是类型1类型2之一

    • dir(<对象实例>),获取对象所有属性和方法

      • len('ABC')等价于'ABC'.__len__()
      • hasattr(<对象实例>,<属性>),判断是否存在这个<属性>
      • setattr(<对象实例>,<属性>,[属性默认值]),设置<属性>
      • getattr(<对象实例>,<属性>),获取<属性>
    • 类属性和实例属性

    class A(object):
    	name = 'A' #类属性,直接通过 A.name访问
    	def __init__(self,a):
    	 	self.a =a #实例属性
    

2.12 面向对象高级编程🔗

  • 1.动态绑定变量方法

    from types import MethodType 
    def hao():
    	print('hh')
    class A(object):
    	pass
    a = A()
    a.name='aa' #动态绑定变量,只作用当前实例
    a.hao=MethodType(hao,a) #动态绑定方法,只作用当前实例
    A.hao=hao #类的所有实例绑定方法
    print(a.name)
    
  • 2.使用__slots__ 限制实例绑定的属性

    class A(object):
    	__slots__=('name','age') #tuple类型,表示只允许添加name和age的属性
    class A2(A):
    	pass
    a = A()
    a.name='a'
    a.age='b'
    a.sc='c'#AttributeError
    a = A2()
    a.sc = 'c' #子类,不受影响
    #如果A2增加`__slots__=('sc')`那么A2将限制包括父类的name,age和sc,3个属性
    
  • 3.@property,修饰一个方法,用于属性直接可读操作, 并且生成@<方法名>.setter进行直接可写操作。 在不用调用(set/get)方法下,直接操作属性并进行检查判断。(等价于动态绑定属性+属性检查)

    class A(object):
    
    	@property
    	def score(self):
    		return self.__score
    	@score.setter
    	def score(self,value):
    		if not (isinstance(value,int)):
    			raise ValueError('score must be an integer!')
    		if (value<0 or value>100):
    			raise ValueError('score between 0 and 100!')
    		self.__score=value
    #可以通过直接访问方式
    a = A();
    a.score=10
    print(a.score)
    a.score=101 #ValueError
    
  • 4.多重继承

    • MixIn:一个类实现多个功能,可通过多继承方式
    class A(object):
    	def run(self):
    		print('A')
    	def run1(self):
    		print('run1() A')
    class B(object):
    	def run(self):
    		print('B')
    class C(B,A):
    	pass
    #执行
    a = C()
    a.run()
    a.run1()
    
  • 5.定制类,类中特殊函数覆写

    • __str__: print(<类对象>)中打印的内容
    • __repr__: 打印类对象的内容
    • __iter__:设置为迭代对象用于for ... in循环,实现__next__方法
    • __getitem__:跟获取数组下标一样获取数据项,<类对象>[0]、<类对象>[1]
      • <类对象>[:]、<类对象>[:2]也可以切片对象, 那么需要判断处理isinstance(<变量>,slice)
    • __getattr__:覆写,那么py会先根据这个方法尝试获取属性
    • __call__:类对象可以作为函数调用使用,(PS: s是类对象,那么可以s()进行调用)
      • callable(<类实例>):判断类对象是否作为可调用的函数
  • 6.枚举类

    • 定义: from enum import Enum 索引引用[]从1开始
    • Enum(<枚举类型名称>,(<枚举1>,<枚举2>))
    • class A(Enum): 枚举自定义派生类
    from enum import Enum
    w = Enum('weekly',('Sun','Mon','Tue','Wed','Thu','Fri','Sat'))
    w(1) #输出 <weekly.Sun:1>
    w(1).name #输出'Sun'
    w(1).value #输出1
    
    from enum import unique
    @unique #防止重复值	
    class A(Enum):
    	Sun=0 
    	Mon=1
    	Tue=2
    	Wed=3
    	Thu=4
    	Fri=5
    	Sat=6
    A.Sun.name #输出'Sun'
    A.Sun.value #输出0
    
  • 7.使用元类

    • type(<类名>,(<父类>,[<父类> 0~n个]),dict(<方法名>=<绑定的方法>)):动态生成class类
    • metaclass
      • 使用:class <类名>(<父类>,metaclass=<metaclass类>) 其中metaclass=<metaclass类>,用于生成<类名>实例 metaclass类 1.继承自type 2.实现__new__(cls, name, bases, attrs)方法实现创建新的实例 cls:<metaclass类> name:创建类的名字 base:创建类继承的父类集合 attr:创建类的属性/方法集合
      class Model(type):
      	def __new__(cls,name,base,attr):
      		print(attr)
      		attr['ok']='model gen ok' #添加属性
      		attr['add']=lambda self,value: self.append(value) #添加方法
      		print(attr)
      		return type.__new__(cls,name,base,attr)
      
      class Test(list,metaclass=Model):
      	pass
      
      if(__name__=='__main__'):
      	a = Test()
      	print(a.ok)
      	a.add(1)
      	print(a)
      
      
  • 8.super(<class_type>,<class_instance>)调用父类函数 按照继承方法的顺序调用,称为MRO(Method Resolution Order)

    class A(object):
    	super(A,self).__init__():
    	print('A')
    class B(object):
    	super(B,self).__init__():
    	print('B')
    class C(A,B):
    	super(C,self).__init__():
    	print('C')
    C.mro() # <class C>,<class A>,<class B>
    C() 
    #输出 B A C
    #C()
    #开始从C调用super()—>
    #下一个指向A,在A中调用super()->
    #下一个指向B,在B中调用super()是object
    #最后,打印B->打印A->打印C 
    
  • 9.静态方法、类方法、实例方法

    • @staticmethod 静态方法 (类/实例)访问,不用写默认参数self,不能调用实例变量、类变量等
    • @classmethod类方法,(类/实例)访问,需要有默认参数self,不能调用实例变量。
    • 实例方法,实例访问,需要有默认参数self

2.13 错误、调试和测试🔗

  • 1.异常捕获
    • try...except...as e ... finally...
    try: 
    <执行>
    except <异常类> as e:
    	<执行>
    finally:
    	<执行>
    
    • logging 记录错误
    • raise 抛出错误
  • 2.日志输出
    • assert代替print 使用python -O <文件名>.py其中 -O大写字母O表示关闭断言 那么,assert将会被pass代替
    • logging 和android logcat类似
    import logging 
    logging.basicConfig(level=logging.INFO) #添加logging显示等级
    
  • 3.单步调试pdb:python -m pdb xxx.py
  • 4.单元测试 import unittest模块
  • 5.文档测试 import re模块

2.14 IO编程🔗

  • 1.文件读写

    • 1.获取文件对象 open(<路径>,'r') 获取文件对象,标识符r=读,w=写 默认读取文本文件
    • 2.读取文件
      • read()一次性读取文件内容
      • read(size)反复调用读取
      • readlines()读取一行
    • 3.关闭文件
      • close()
      • with open(<路径>,'r') as f: print(f.read())会自动调用close(),不用自己调用
    • 二进制文件rb 标识符表示
    • 字符编码,读取非UTF-8文件open(<路径>,'r',encoding='gbk',error='ignore') error='ignore'用于读取编码错误时,忽略处理
    • 4.写文件
      • 标识符:w写文本文件,wb写二进制文件 wa(append)追加形式写入(其他是覆盖方式)
      • write()
      • close()写完需要关闭,也可以用with open(...) as f:方式自动关闭
  • 2.StringIO和BytesIO

    • 1.StringIO
    from io import StringIO
    f = StringIO() #内存中字符操作
    f.write('hello ')
    f.write('world')
    f.getValue()# 'hello world'
    while True:
    	s = f.readlines()
    	if(s==' '):
    		break
    	print(s.strip())#strip()去掉空格
    
    • 2.BytesIO
    from io import BytesIO
    f = BytesIO()
    f = BytesIO(b'\xe5\xad\x97\xe8\x8a\x82')
    f.write('字节'.encode('utf-8'))#写入utf-8编码的字节
    f.close()
    f.read()
    
  • 3.os模块(系统信息,创建目录)

    • import os os.name posix表示类Unit系统(Linux、Unix、Mac OS),nt表示windows os.uname() posix上提供显示系统信息 os.environ()环境变量 os.environ().get(PATH) 某个环境变量 os.path.abspath('.') 当前目录绝对路径 os.path.join('/User','deskDir') 目录下创建新目录 os.path.isdir('x')是否目录 os.path.isfile('x')是否文件 os.mkdir('/User/deskDir')创建新目录 os.rmdir('/User/deskDir')删除目录 os.path.split('/User/deskDir')目录和最后一个/的内容分离 os.path.splitext('/User/deskDir')目录和文件扩展名分离 os.rename('file.txt','newfile.txt')文件重命名 os.remove('file.txt')删除文件
    [x for x in os.listdir('.') if os.path.isfile(x)]#过滤掉非文件内容
    
    • shutil模块 有提供文件复制函数
  • 4.序列化

    • pickle模块,序列化和反序列化
    import pickle
    d = [1,2,3]
    #序列化
    b=pickle.dumps(d) #生成序列化bytes
    f = open('file.txt','wb')
    pickle.dump(f,b) #生成序列化bytes,并写入到文件
    f.close()
    #反序列化
    f.open('file.txt','rb')
    pickle.loads(b) #反序列化bytes
    pickle.load(f) #反序列化文件内容
    f.close()
    
    • JSON
    {}  		<-->  dict
    []  		<-->  list
    "string"  	<-->  str
    1234.56  	<-->  int或float
    true/false  <-->  True/False
    null 		<-->  None
    
    import json
    d = dict('name':'n','age':12)
    j = json.dumps(d) #返回json
    json.loads(j) #json转为类型
    
    • class对象 json和反json json.dumps(<class对象>,default=<转换函数>) 转json json.loads(<json数据>,object_hook=<转换函数>) 反json
    import json
    def student2dict(stu):
    	return {'name':stu.name,'age':stu.age}
    def dict2student(d):
    	return Student(d['name'],d['age'])
    s = Student('a',22)
    j = json.dumps(s,default='student2dict') #class对象转json
    json.loads(j,object_hook=dict2student) #json->dict对象->转student对象
    
  • 5.进程和线程

    • 多进程

      • fork()在类Unixt系统中,调用1次返回2次,分别是父进程返回和子进程返回。
      • getpid()当前进程id
      • getppid()获得父进程id 子进程返回0 父进程返回子进程id
      import os
      pid = os.fork() #Windows没有fork调用
      if pid == 0 : #子进程
      if not pid ==0 : #父进程
      
      • multiprocessing 跨平台多进程模块
      from multiprocessing import Process
      import os 
      def run_child_proc(name):
      	print('name %s'%name)
      if(__name__=='__main__'):
      	#子进程执行函数run_child_proc,args是dict类型所以加,
      	p = Process(target=run_child_proc,args=('test',)) #创建进程
      	p.start() #子进程开始启动
      	p.join() #等待子进程p执行完成后,用于进程协调同步
      	print('child proc closed')
      
      • Pool 多进程
      from multiprocessing import Pool
      import os,time
      def run_child_proc(name):
      	time.sleep(1 * 1)
      	print('child proc name %s'%name,' pid:',os.getpid())
      
      if(__name__=='__main__'):
      	print('parent Process pid = %s'%os.getpid())
      	p = Pool(3)
      	for i in range(3):
      		p.apply_async(run_child_proc,(i,))#多进程异步执行,进程池限制3个进程 默认4个
      	print('Wait')
      	p.close() #不允许添加新的进程
      	p.join() #等待进程全部执行完成
      	print(' All child proc closed! ')
      
      • 子进程subprocess模块
      • 进程间通信通过Queue、Pipes实现
    • 多线程

      • _threadthreading模块
      import threading
      
      def new_thread_run():
      	print('threading:%s'%threading.current_thread().name)
      print('main thread:%s'%threading.current_thread().name)
      #不要name,py会自动起名字
      t = threading.Thread(target=new_thread_run,name='new_thread')
      t.start()
      t.join()
      print('main thread:%s closed!'%threading.current_thread().name)
      
      • Lock 线程上锁
      import threading,time
      value=0
      lock = threading.Lock() #获得锁对象
      def changeValue():
      	global value
      	value=value+1
      	time.sleep(0.0001)
      	value=value-1
      
      def new_thread_run():
      	lock.acquire() #获取锁
      	for i in range(1000):
      		changeValue()
      	print(threading.current_thread().name,'value',value)
      	lock.release() #释放锁
      t1 = threading.Thread(target=new_thread_run)
      t2 = threading.Thread(target=new_thread_run)
      t1.start()
      t2.start()
      t1.join()
      t2.join()
      print(threading.current_thread().name,'value',value)
      print('main thread closed !')
      
    • ThreadLocal:每个线程单独存储线程内部使用的变量

    import threading
    
    global_local = threading.local() #线程本地变量
    def process_t():
    	print('thead: %s tag:%s'%(threading.current_thread().name,global_local.tag))
    def new_thread_run(tag):
    	global_local.tag = tag
    	process_t()
    t1 = threading.Thread(target=new_thread_run,args=('OK',))
    t2 = threading.Thread(target=new_thread_run,args=('HAO',))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('main thread closed !')
    	
    
      - 多任务模型,切换作业有代价,就是保存/恢复现场
      - Nginx 支持异步IO的Web服务器
      - 协程:Python语言中,单线程的异步编程模型
    
    • 分布式进程:multiprocessing的子模块managers支持

2.15 正则表达式🔗

  • import re:导入re模块
  • r:正则表达式前面加r忽略python中字符串的转义(PS:\\== r\,字符串中输入\要转义)
  • re.match(<正则表达式>,<需要匹配的字符串>):匹配
  • re.compile(<正则表达式>):预编译
  • groups()获得所有匹配的组
  • group(0)默认匹配的组
  • group(1),group(2)表示要提取的组1,2 正则表达式中用()表示提取
  • 正则表达式匹配过程(对于经常匹配的,预编译可以提高效率)
    • 1.编译正则表达式,如果正则表达式不合法,报错。
    • 2.用编译后的正则表达式去匹配字符串

2.16 内建模块🔗

  • 1.datetime 处理日期和时间标准库
    • from datetime import datetime: datetime.now()获取现在时间
    • import datetime: datetime.datetime.now()获取现在时间
  • 2.collections 集合标准库
  • 3.base64
  • 4.struct 解决bytes和其他数据类型转换的库
  • 5.hashlib 摘要算法库(MD5,SHA1),生成hash值
  • 6.hmac: (数据+密钥)的方式生成摘要hash值
  • 7.itertools:操作迭代对象的库
  • 8.contextlib:上下文对象,提供一些文件资源释放的便捷操作
  • 9.urllib:操作URL的库,http请求之类的
  • 10.xml:xml操作
    • DOM:整个XML一次性读入
    • SAX:边读边解析
  • 11.HTMLParser:解析Html,解析html中文本、图像等

2.17 第三方模块🔗

  • 1.图像处理:切片、旋转、滤镜、模糊、调色板等
    • PIL(python image library):仅支持python2.7
    • Pillow:支持python3.x,+新特性
    • pip install pillow 安装
  • 2.requests 网络访问库
  • 3.chardet 编码检测库
  • 4.psutil(process system utilies) 获取系统信息工具库
  • 5.virtualenv 建立隔离的运行环境
    • 每个应用使用不同python版本运行环境处理
  • 6.图形界面库
    • tkinter:python内置的GUI编程库
    • tkwxWidgetsQtGTK:第三方GUI库
    • 海龟绘图库(Turtle Graphics):python内置,通过指挥一只小海龟在屏幕上绘图
      • 起源 LOGO语言:专门给儿童学习编程的语言,特色是通过一只小海龟绘图。
      • import turtle import *

2.18 网络编程🔗

  • TCP/UDP编程
  • import socket 引入socket库

2.19 电子邮件🔗

  • SMTP 发送邮件协议(内置模块)
    • import smtplib:发送邮件模块
    • import email:构造邮件模块
  • POP3 收取邮件协议(内置模块)(POP协议,最新版本号3)
    • 1.import poplib:收邮件模块
    • 2.import email:解析原始文本,构成邮件对象

2.20 数据库🔗

(python中,DB-API通用,数据库操作接口类似)

  • SQLite(内置模块)
    • import sqlite3
  • MySQL
    • 1.安装MySQL
      • windows下,安装选择utf-8
      • linux,mac os下,需要编辑配置文件修改编码,改为utf-8。
        • 配置文件默认存放在/etc/my.cnf或者/etc/mysql/my.cnf
      • PS:MySQL版本>=5.5.3,编码可设置为utf-8mb4(和utf-8兼容), 还可支持Unicode最新标准,可显示emoji字符
    • 2.安装MySQL驱动 pip install mysql-connector-python --allow-external mysql-connector-pythonpip install mysql-connector
  • sqlalchemy:第三方ORM(对象关系映射)框架
    • pip install sqlalchemy

2.21 Web开发🔗

  • JSP:html+java的脚步代码形式

  • HTML

    • HTML 网页
    • CSS(Cascading Style Sheets)层级样式表
    • JS 执行脚本
  • Web应用流程

    • 1.浏览器发送http请求。
    • 2.服务端接收请求,生成html文档。
    • 3.服务端把html文档,作为Body发送给浏览器。
    • 4.浏览器响应,取出Body中的html文档并显示。
    • 现存Http服务器(Apache、Nginx、Lighttpd等):用于接收用户请求,从文件读取html,响应返回。
    • Python用于动态生成html文档
  • WSGI接口(Web Server GateWay Interface)

    • import wsgiref 内置的WSGI服务器
    • ctrl + c终止服务器
    #Web应用程序WSGI处理函数
    #包含2个参数:environ包含http请求信息的dict对象,start_response发送HTTP响应的函数
    #start_response函数包含2个参数:
    #1.响应码,2.一组list表示的Http Header,使用str表示的tuple类型
    #hello.py
    from wsgiref.simple_server import make_server
    def application(environ,start_response):
    	start_response('200 OK',[('Content-Type','text/html')]) #响应Header
    	return [b'<html><body>hello world</body></html>'] #响应Body
    httpd = make_server('',8000,application) #1.ip地址,2.端口,3.处理函数
    httpd.server_forever()#监听请求
    
  • WSGI 第三方框架

    • Flask
    pip install flask #安装flask
    #flask默认在5000端口上
    from flask import Flask
    from flask import request
    app = Flask(__name__)
    @app.route('/',methods=['GET']) 
    def index():
    	return '<h>hello world</h>'
    
    @app.route('/login',methods=['GET']) #/login选项框
    def login():
    	return '''<form action="/login" method="post">
    			<p><input name="username"></p>
    			<p><input name="password"></p>
    			<p><button type="submit">Sign In</button></p>
    			</form>'''
    #flask通过request.form['name']来获取表单的内容
    @app.route('/login',methods=['POST'])#/login输入进行post请求
    def loginP():
    	if request.form['username']=='admin' and request.form['password']=='123456':
    		return '<h>hello admin</h>'
    	return '<h>login error!</h>'
    if(__name__=='__main__'):
    	app.run()
    
    • Django:全能型Web框架
    • web.py:小巧的Web框架
    • Bottle:和Flask类似的Web框架
    • Tornado:Facebook开源异步Web框架
  • html模板:分离html和业务逻辑 MVC模式

    • 模板:独立html文件,内部传入数据变量的格式
    • flask
      • jinja2:默认支持模板
      `pip install jinja2`
      	- `{{name}}`:html中表示变量name
      	- `{%...%}`:html中`...`表示指令写的地方
      
      #模板html存放在目录templates,templates和.py运行文件同级目录下
      #render_template() 显示模板
      #index.html
      <html>
      <body>
      <h>hello world</h>
      </body>
      </html>
      #login.html
      <html>
      <body>
      {% if message %}		
      <p>{{message}}</p>
      {% endif %}
      <form action="/login" method="post">
      	<p><input name="username" value="{{username}}"></p>
      	<p><input name="password"></p>
      	<p><button type="submit">Sign In</button></p>
      	</form>
      </body>
      </html>
      #login_ok.html
      <html>
      <body>
      <h>hello {{name}}</h>
      </body>
      </html>
      #index.py
      from flask import Flask
      from flask import request
      from flask import render_template
      app = Flask(__name__)
      @app.route('/',methods=['GET'])
      def index():
      	return render_template('index.html')
      
      @app.route('/login',methods=['GET'])
      def login():
      	return render_template('login.html')
      @app.route('/login',methods=['POST'])
      def loginP():
      	if request.form['username']=='admin' and request.form['password']=='123456':
      		return render_template('login_ok.html',username='admin')
      	else:
      		return render_template('login.html',message='Error username or password!',username=request.form['username'])
      if(__name__=='__main__'):
      	app.run()
      
      • Mako模板
        • ${name} 变量
        • <%...%> 指令
      • Cheetah模板
        • ${name} 变量
        • <%...%> 指令
      • Django
      - {{name}} 变量
      - {%...%} 指令
      

2.22 异步IO🔗

  • 异步IO模型:利用主线程消息循环。执行IO操作时,只是发出IO指令不等待结果,继续执行其他代码,等待 IO通知后再来处理。

  • 协程:一个线程执行多个函数之间不按顺序调用,切换调用(函数调用一半切换到另一个函数执行)

    • python中通过generator实现了协程。当next(generator), 对生成generator函数进行中断处理,然后执行其他函数
    #同一个线程内,边写边读
    def read():
    	r = ''
    	while True:
    		read = yield r
    		if read:
    			print('read =',read)
    
    def write(r):
    	r.send(None)
    	i=1
    	while i<10:
    		print('write =',i)
    		r.send(i)
    		i=i+1
    
    r = read()
    write(r)
    
  • asyncio:内置异步IO支持

    • @asyncio.coroutine generator标记为coroutine
    • yield from <coroutine>切换协程
    • asyncio.get_event_loop()消息循环
    • loop.run_until_complete(<task()>或asyncio.wait(<tasks>)) tasks=(task(),task())
import threading
import asyncio

@asyncio.coroutine
def hello():
	print('Hello world %s!'%threading.currentThread())
	yield from asyncio.sleep(1) #切换到其他协程延时1秒
	print('Hello again %s!'%threading.currentThread())

loop = asyncio.get_event_loop() #消息循环
tasks=[hello(),hello()]
loop.run_until_complete(asyncio.wait(tasks)) #或者loop.run_until_complete(hello())运行一个
loop.close()

输出:
Hello world <_MainThread(MainThread, started 680)>!
Hello world <_MainThread(MainThread, started 680)>!
#暂停1秒后显示下面
Hello again <_MainThread(MainThread, started 680)>!
Hello again <_MainThread(MainThread, started 680)>!
  • async/await Python3.5以后新的语法
    • @asyncio.coroutine替换为async
    • yield from 替换为await
    import threading
    import asyncio
    
    async def hello():
    	print('Hello world %s!'%threading.currentThread())
    	await asyncio.sleep(1) #切换到其他协程延时1秒
    	print('Hello again %s!'%threading.currentThread())
    
    loop = asyncio.get_event_loop() #消息循环
    tasks=[hello(),hello()]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    
  • aiohttp:基于asyncio实现的HTTP框架
    • pip install aiohttp
    import asyncio
    from aiohttp import web #pip install aiohttp
    
    async def index(request):
    	await asyncio.sleep(0.5)
    	return web.Response(body= ('hello, %s!' % request.match_info['name']))
    
    async def init(loop):
    	app = web.Application(loop=loop)
    	app.router.add_route('GET','/{name}',index)
    	ser =await loop.create_server(app.make_handler(),'localhost',8000)
    	print('Server started at http://localhost:8000 ...')
    	return ser
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()