chaoz的杂货铺

生命有息、学无止境、折腾不止

0%

python-peewee

全部数据类型

20210319141915

定义Model,建立数据库

第一种方式:

先定义Model,然后通过db.create_tables()创建或Model.create_table()创建表。

例如,我们需要建一个Person表,里面有name、birthday和is_relative三个字段,我们定义的Model如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from peewee import *

# 连接数据库
database = MySQLDatabase('test', user='root', host='localhost', port=3306)

# 定义Person
class Person(Model):
name = CharField()
birthday = DateField()
is_relative = BooleanField()

class Meta:
database = database
然后,我们就可以创建表了

# 创建表
Person.create_table()

创建表也可以这样, 可以创建多个
database.create_tables([Person])
其中,CharField、DateField、BooleanField等这些类型与数据库中的数据类型一一对应,我们直接使用它就行,至于CharField => varchar(255)这种转换Peewee已经为我们做好了 。

第二种方式:

已经存在过数据库,则直接通过python -m pwiz批量创建Model。
例如,上面我已经创建好了test库,并且创建了Person表,表中拥有id、name、birthday和is_relative字段。那么,我可以使用下面命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 指定mysql,用户为root,host为localhost,数据库为test
python -m pwiz -e mysql -u root -H localhost --password test > testModel.py
然后,输入密码,pwiz脚本会自动创建Model,内容如下:

from peewee import *

database = MySQLDatabase('test', **{'charset': 'utf8', 'use_unicode': True, 'host': 'localhost', 'user': 'root', 'password': ''})

class UnknownField(object):
def __init__(self, *_, **__): pass

class BaseModel(Model):
class Meta:
database = database

class Person(BaseModel):
birthday = DateField()
is_relative = IntegerField()
name = CharField()

class Meta:
table_name = 'person'

操作数据库
操作数据库,就是增、删、改和查。

一、增

直接创建示例,然后使用save()就添加了一条新数据

1
2
3
# 添加一条数据
p = Person(name='liuchungui', birthday=date(1990, 12, 20), is_relative=True)
p.save()

二、删

使用delete().where().execute()进行删除,where()是条件,execute()负责执行语句。若是已经查询出来的实例,则直接使用delete_instance()删除。

1
2
3
4
5
6
7
8
# 删除姓名为perter的数据
Person.delete().where(Person.name == 'perter').execute()

# 已经实例化的数据, 使用delete_instance
p = Person(name='liuchungui', birthday=date(1990, 12, 20), is_relative=False)
p.id = 1
p.save()
p.delete_instance()

三、改

若是,已经添加过数据的的实例或查询到的数据实例,且表拥有primary key时,此时使用save()就是修改数据;若是未拥有实例,则使用update().where()进行更新数据。

1
2
3
4
5
6
7
8
# 已经实例化的数据,指定了id这个primary key,则此时保存就是更新数据
p = Person(name='liuchungui', birthday=date(1990, 12, 20), is_relative=False)
p.id = 1
p.save()

# 更新birthday数据
q = Person.update({Person.birthday: date(1983, 12, 21)}).where(Person.name == 'liuchungui')
q.execute()

四、查

单条数据使用Person.get()就行了,也可以使用Person.select().where().get()。若是查询多条数据,则使用Person.select().where(),去掉get()就行了。语法很直观,select()就是查询,where是条件,get是获取第一条数据。

1
2
3
4
5
6
7
8
9
10
11
12
# 查询单条数据
p = Person.get(Person.name == 'liuchungui')
print(p.name, p.birthday, p.is_relative)

# 使用where().get()查询
p = Person.select().where(Person.name == 'liuchungui').get()
print(p.name, p.birthday, p.is_relative)

# 查询多条数据
persons = Person.select().where(Person.is_relative == True)
for p in persons:
print(p.name, p.birthday, p.is_relative)

1、查询单条数据

我们可以直接使用get()获取单条数据,在参数中传递查询条件。

1
2
3
# 查询name为liuchungui的Person
p = Person.get(Person.name == 'liuchungui')
print(p.name) # 打印出liuchungui

2、查询多条数据

使用select()查询,后面不添加where()是查询整个表的内容。

1
2
3
4
5
# 查询Person整张表的数据
persons = Person.select()
# 遍历数据
for p in persons:
print(p.name, p.birthday, p.is_relative)

我们可以在select()后面添加where()当做查询条件

1
2
3
4
# 获取is_relative为True的数据
persons = Person.select().where(Person.is_relative == True)
for p in persons:
print(p.name, p.birthday, p.is_relative)

我们可以通过sql()方法转换为SQL语句进行查看理解

1
2
3
persons = Person.select().where(Person.is_relative == True)
# 打印出的结果为:('SELECT `t1`.`id`, `t1`.`name`, `t1`.`is_relative` FROM `Person` AS `t1` WHERE (`t1`.`is_relative` = %s)', [True])
print(persons.sql())

使用 filter()函数

1
2
3
4
if __name__ == "__main__":
ps = Person.filter(name='bobby')
for item in ps:
print(item.name) # bobby

3、查询数据条数、排序、Limit

查询数据条数,直接在后面加上count()就行了

1
2
3
4
5
# 查询整张表的数据条数
total_num = Person.select().count()

# 查询name为liuchungui的Person数量, 返回数量为1
num = Person.select().where(Person.name == 'liuchungui').count()

排序,使用的是order_by(),参数内加上按对应字段进行排序

1
2
3
4
5
# 按照创建时间降序排序
persons = Person.select().order_by(Person.create_time.desc())

# 按照创建时间升序排序
persons = Person.select().order_by(Person.create_time.asc())

Limit是使用limit(),传递一个数字,例如2就是获取前两条数据,它可以搭配offset()一起使用

1
2
3
4
5
# 相当于sql语句: select * from person order by create_time desc limit 5
persons = Person.select().order_by(Person.create_time.asc()).limit(5)

# 相当于sql语句中:select * from person order by create_time desc limit 2, 5
persons = Person.select().order_by(Person.create_time.asc()).limit(5).offset(2)

四、查询操作符

20210319140423

其中,==、<、<=、>、>=、!=是很容易理解的,重点提下<<、>>和%。用示例说明:

1
2
3
4
5
6
7
8
# <<使用,查询省份属于湖北和湖南的,对应sql语句:select * from person where province in ('湖南', '湖北')
persons = Person.select().where(Person.province << ['湖南', '湖北'])

# >>使用,查询省份为空的,sql语句: select * from person where province is Null
persons = Person.select().where(Person.province >> None)

# %使用,查询省份中含有 湖 字,sql语句:select * from person where province like '%湖%'
persons = Person.select().where(Person.province % '%湖%')

有时,我们查询条件不止一个,需要使用逻辑运算符连接,而Python中的and、or在Peewee是不支持的,此时我们需要使用Peewee封装好的运算符,如下:

20210319140522

使用示例如下:

1
2
3
4
5
6
# 查询湖南和湖北的, 注意需要用()将Person.province == '湖南'包一层
persons = Person.select().where((Person.province == '湖南') | (Person.province == '湖北'))

# 查询湖南和身高1.75
persons = Person.select().where((Person.province == '湖南') & (Person.height == 1.75))
注意:使用的时候,需要内部还使用()将Person.province == '湖南'包起来,否则不会生效。示例:persons = Person.select().where((Person.province == '湖南') | (Person.province == '湖北'))

除了上面的操作符以外,Peewee还有更多没有重载的操作符,如下:

20210319140550

五、联表查询

有时,我们需要查询两个表中的数据,在Peewee中也可以实现,官方示例如下:

1
2
3
4
query = (Tweet
.select(Tweet.content, Tweet.timestamp, User.username)
.join(User, on=(User.id == Tweet.user_id))
.order_by(Tweet.timestamp.desc()))

上面查询的结果,会在Tweet的Model中添加一个属性user,此时我们可以通过user来访问到查询到的User表信息,如下:

1
2
for tweet in query:
print(tweet.content, tweet.timestamp, tweet.user.username)

Peewee批量插入数据

最近,需要同步数据到Mysql中,数据量有几百万。但是,自己写一个for循环,然后使用Model.create()添加,发现这种方式特别慢。难道,像去年爬数据一样,将几百万的数据从Redis取出来,然后使用多线程进行保存?

在Google上搜索了之后,找到一种更简单的方式,那就是使用Peewee原生的方法insert_many(),进行批量数据插入。

那么,它的速度有多快?

下面,是我简单的比较了插入10000条数据到本地数据库中,四种方式所需要的时间。

第一种,for循环和Model.create()

代码如下:

1
2
3
4
5
6
7
8
9
from xModels import XUser, database
import time

NUM = 10000
start_time = time.time()

users = []
for i in range(NUM):
XUser.create(phone='13847374833', password='123456')

print(“插入{}条数据, 花费: {:.3}秒”.format(NUM, time.time()-start_time))
结果:插入10000条数据, 花费: 10.5秒

第二种,for循环和Model.create(),并放入事务中

代码如下:

1
2
3
4
5
6
7
8
9
from xModels import XUser, database
import time

NUM = 10000
start_time = time.time()

with database.atomic():
for i in range(NUM):
XUser.create(phone='13847374833', password='123456')

print(“插入{}条数据, 花费: {:.3}秒”.format(NUM, time.time()-start_time))
结果:插入10000条数据, 花费: 4.94秒

第三种,使用原生的insert_many()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
from xModels import XUser, database
import time

NUM = 10000
data = [{
'phone': '13847374833',
'password': '123456'
} for i in range(NUM)]

start_time = time.time()

for i in range(0, NUM, 100):
XUser.insert_many(data[i:i + 100]).execute()

print(“插入{}条数据, 花费: {:.3}秒”.format(NUM, time.time()-start_time))
结果:插入10000条数据, 花费: 0.505秒

第四种,使用原生的insert_many()方法,并放入事务中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from xModels import XUser, database
import time

NUM = 10000
data = [{
'phone': '13847374833',
'password': '123456'
} for i in range(NUM)]

start_time = time.time()

with database.atomic():
for i in range(0, NUM, 100):
# 每次批量插入100条,分成多次插入
XUser.insert_many(data[i:i + 100]).execute()

print(“插入{}条数据, 花费: {:.3}秒”.format(NUM, time.time()-start_time))
结果:插入10000条数据, 花费: 0.401秒

结论

insert_many()比使用for+Model.create()方式快很多,在上面例子中快了十倍不止
使用事务,可以些许提升

Peewee使用之事务

常用方法

Peewee实现事务最常用的方法是Database.atomic()方法,非常简单,代码示例如下:

1
2
3
4
5
6
from xModels import XUser, database

with database.atomic() as transaction:
XUser.create(phone='184738373833', password='123456')
XUser.create(phone='184738373833332323232', password='123456')
当事务执行成功之后,它会自动commit(),不需要我们手动调。当事务的代码块中抛出异常时,它会自动调用rollback()。

例如,如果上面的phone设置了长度限制,第二条语句中的phone太长,那么就会抛出异常,然后上面结果是两个用户都不会被添加到数据库中。

注意:上面database是在xModels文件中MySQLDatabase的一个实例,创建方法如下:

1
2
3
from peewee import MySQLDatabase
database = MySQLDatabase('test', **{'charset': 'utf8', 'use_unicode': True, 'host': 'localhost', 'user': 'root', 'password': ''})
除了自动commit()和rollback()之外,我们也可以手动调用commit()和rollback()方法。

例如:

1
2
3
4
5
6
with database.atomic() as transaction:
XUser.create(phone='199999999999', password='123456')
transaction.commit()
XUser.create(phone='188888888888', password='123456')
transaction.rollback()
结果:手动调用了commit(),phone为199999999999的用户成功添加,而188888888888因为rollback(),不会被添加到数据库中。

两种使用方式

Peewee中实现事务有两种使用方式,一种是将atomic当做Context manager使用,另外一种将atomic当修饰器使用。

Context manager

这种方式,就是我们前面已经使用过了,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
from xModels import XUser, database

with database.atomic() as transaction:
XUser.create(phone='184738373833', password='123456')
XUser.create(phone='184738373833332323232', password='123456')
修饰器
@database.atomic()
def create_user(phone, password):
XUser.create(phone=phone, password=password)
raise Exception('just a test')

create_user(phone='184738373833', password='383838')
上面,由于create_user()中抛出了一个异常,修饰器中会执行rollback(),从而导致创建用户失败。

事务嵌套使用

Peewee中事务还可以进行嵌套,示例如下:

1
2
3
4
5
6
with database.atomic() as txn:
XUser.create(phone='18734738383')

with database.atomic() as nested_txn:
XUser.create(phone='1883328484')
当上面没有抛出异常时,两个用户同时被添加;当其中任何一条语句抛出异常时,两个用户都不会添加。

不过,它还是跟非嵌套有些区别的,看下面示例。

第一种情况,在嵌套事务中执行rollback(),代码如下:

1
2
3
4
5
6
7
with database.atomic() as txn:
XUser.create(phone='188888888')

with database.atomic() as nested_txn:
XUser.create(phone='199999999')
nested_txn.rollback()
结果:188888888用户被添加,而199999999不会被添加。

第二种情况,在外层的事务中执行rollback(),代码如下:

1
2
3
4
5
6
7
8
with database.atomic() as txn:
XUser.create(phone='188888888')

with database.atomic() as nested_txn:
XUser.create(phone='199999999')

txn.rollback()
结果:两个用户都不会被添加。

也就是说,外层的rollback()会将嵌套中的事务也回滚,而嵌套中的事务不能回滚外层的内容。当然,这只是我的一个尝试,可能还有其他的不同,还需要再探索。

全手动实现事务

全手动实现事务使用的是Database.manual_commit()方法,它也有Context manager和修饰器两种方式。

下面,我们使用Context manager方式来实现前面说的atomic()方法,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
with database.manual_commit():
database.begin() # 开始事务
try:
XUser.create(phone='188888888') # 添加用户
except:
database.rollback() # 执行rollback
raise
else:
try:
database.commit() # 没有发生异常,执行commit
except:
database.rollback() #commit发生异常时,执行rollback
raise

总结

Peewee实现事务最简单的方法就是atomic(),它可以使用Context manager和修饰器两种方式,它也可以手动调用commit()和rollback()。

喜欢这篇文章?打赏一下作者吧!

欢迎关注我的其它发布渠道