手机版

python3.6使用sqlalchemy读取mysql中的数据并进行多进程并发处理

时间:2020-05-05 来源:互联网 编辑:宝哥软件园 浏览:
1. 介绍 SQLALChemy2. 安装 SQLAlChemy2.1 创建测试数据库2.2 用 SQLALChemy 创建数据库表3. 多进程搜索程序3.1 程序关键点3.2 程序运行

1. 介绍 SQLALChemy

SQLALChemy 是一个 python 的 ORM(Object Relational Mapper) 框架,开发人员可以快速开发操作数据库的程序,
它提供完整的数据库访问层,提供高性能的数据库访问能力。
它支持 SQLite、MySQL、Postgres、Oracle 等常用的数据库访问

2. 安装 SQLAlChemy

pip install sqlalchemy

2.1 创建测试数据库

# 建立数据库
CREATE DATABASE `test` /*!40100 DEFAULT CHARACTER SET utf8mb4 */;

2.2 用 SQLALChemy 创建数据库表

2.2.1 程序关键点

创建操作数据库的 engine,使用 pymysql 库访问 mysql 数据库 创建操作数据库的 session,绑定到 engine 上 从 Base 继承定义 User,Article 类,对应 mapping 到数据库的 member,article 表 使用 session.create_all 创建数据库表结构 session.add_all 新增数据到数据库 session.commit 提交所有变更到数据库,此时可以再数据库中查询插入的数据 查询数据使用 session.query 方法,也可以在后面连接使用 filter 进行条件过滤
#!/usr/bin/env python
# coding: utf-8


from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


# 创建数据库 engine 使用 utf8 编码
eng = create_engine('mysql+pymysql://root:1@localhost:3306/test?charset=utf8')
Base = declarative_base()

# 创建 session 类,绑定engine
Session = sessionmaker(bind=eng)
session = Session()


class User(Base):
    '''
    用户类,对应数据库的 member 表
    '''
    __tablename__ = 'member'

    # 定义表字段
    mid = Column(Integer, primary_key=True)
    nickname = Column(String(50))
    email = Column(String(128))

    def __repl__(self):
        return '<User(name={}, email={}, nickname{}>'.format(mid,
                                                             email,
                                                             nickname)


class Article(Base):
    '''
    文章类,对应数据库中的 article 表
    '''
    __tablename__ = 'article'

    # 定义表字段
    arid = Column(Integer, primary_key=True)
    tags = Column(String(128))
    description = Column(String(256))
    title = Column(String(256))


def create_table():
    '''
    创建数据库表结构,导入初始数据
    '''
    # 创建表
    Base.metadata.create_all(eng)

    # 插入数据
    session.add_all([
        User(mid=1, nickname='测试数据 test hello', email='test@gmail.com'),
        User(mid=2, nickname='测试数据 china hello', email='test@gmail.com'),
        User(mid=3, nickname='测试数据 上海 hello', email='test@gmail.com'),
        User(mid=4, nickname='测试数据 北京 hello', email='test@gmail.com'),
        User(mid=5, nickname='测试数据 上海 hello', email='test@gmail.com'),
        User(mid=6, nickname='测试数据 山东 hello', email='test@gmail.com'),
        User(mid=7, nickname='测试数据 武夷山 hello', email='test@gmail.com'),
        User(mid=8, nickname='测试数据 黄山 hello', email='test@gmail.com'),

        Article(arid=1, tags='测试数据 test hello', title='销售额度', description='测试 test ok'),
        Article(arid=2, tags='测试数据 china hello', title='成功转型', description='测试 test ok'),
        Article(arid=3, tags='测试数据 上海 hello', title='蓝蓝的天上白云飘', description='测试 test ok'),
        Article(arid=4, tags='测试数据 背景 hello', title='在水一方', description='测试 test ok'),
        Article(arid=5, tags='测试数据 上海 hello', title='晴天,阴天,雨天,大风天', description='测试 test ok'),
        Article(arid=6, tags='测试数据 山东 hello', title='每年365天,每天24小时', description='测试 test ok'),
        Article(arid=7, tags='测试数据 武夷山 hello', title='高效工作的秘密', description='测试 test ok'),
        Article(arid=8, tags='测试数据 黄山 hello', title='战狼2', description='测试 test ok'),
    ]
    )

    # 提交到数据库
    session.commit()


def modify_data():
    '''
    测试修改数据
    '''

    # 查询用户表
    users = session.query(User).all()
    print(users)

    # 查询文章表
    # articles = session.query(Article).all()
    articles = session.query(Article).filter(Article.arid==2)
    print(articles)

    # 修改文章表
    articles[0].description = '程度,修改描述可以成功'
    print(session.dirty)

    # 提交到数据库
    session.commit()


if __name__ == '__main__':
    create_engine()
    # modify_data()

3. 多进程搜索程序

3.1 程序关键点

创建 DbMgr 类,封装数据库访问操作 用 sqlalchemy 从数据库获取 member,article 表中的数据 使用 automap 自动反射出 member,article 表对应的类 创建 Searcher 类,提供进程调用函数,用来查询符合条件的结果,并且提供进程执行完的回调展示方法 创建 10 个进程的进程池 循环获取用户输入,创建 searcher 对象,多进程并发执行过滤 多进程调用使用 python multiprocessing
#!/usr/bin/env python
# coding: utf-8


import os


from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.automap import automap_base
from multiprocessing import Pool


class DbMgr():
    '''
    连接数据库,从数据库获取 用户表,文章表数据
    '''

    def __init__(self):
        eng = create_engine('mysql+pymysql://root:1@localhost/test?charset=utf8')

        # 使用 automap 自动反射出 member,article 表对应的类
        meta = MetaData()
        meta.reflect(eng, only=['member', 'article'])
        Base = automap_base(metadata=meta)
        Base.prepare()

        self._Member = Base.classes.member
        self._Article = Base.classes.article

        # 获取操作数据库的 session
        Session = sessionmaker(eng, autocommit=True)
        self._ses = Session()

    def get_data(self):
        '''
        查询用户表,文章表
        '''
        self._users = self._ses.query('"user"', self._Member.mid,
                                      self._Member.email,
                                      self._Member.nickname).all()

        self._articles = self._ses.query(self._Article).all()
        self._articles = [('ar', i.arid, i.title, i.tags, i.description)
                          for i in self._articles]

        return list(self._users) + list(self._articles)


class Searcher():
    '''
    进城处理函数,查找符合条件的结果,找到后返回结果
    '''
    def __init__(self, keyword):
        self._keyword = keyword

    def run(self, data):
        '''
        查找字符串
        '''
        try:
            if self._keyword in str(data):
                return 'ret: ' + str(os.getpid()) + '->' + str(data)
            else:
                return None
        except Exception as e:
            return e

    def callback(self, data):
        '''
        全部执行完后回调函数,展示结果
        '''
        try:
            for i in data:
                if i:
                    print('match: {}'.format(i))
        except Exception as e:
            print(e)


def main():
    # 从数据库读取数据
    mgr = DbMgr()

    # 创建过滤进程池
    pool = Pool(4)

    # 创建搜索器
    while True:
        keyword = input('
输入搜索词:  ')
        if keyword == 'q':
            break

        searcher = Searcher(keyword)

        # 从数据库获取数据
        data = mgr.get_data()
        res = pool.map_async(searcher.run,
                             data,
                             10,
                             callback=searcher.callback)

        # 等待所有进程执行完成
        res.wait()
        print('all done', res.successful())


if __name__ == '__main__':
    main()

3.2 程序运行

(py36env) servadmin@debian:~/test # python mysql2es.py

输入搜索词:  test
match: ret: 10013->('user', 1, 'test@gmail.com', '测试数据 test hello')
match: ret: 10013->('user', 2, 'test@gmail.com', '测试数据 china hello')
match: ret: 10013->('user', 3, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 4, 'test@gmail.com', '测试数据 背景 hello')
match: ret: 10013->('user', 5, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 6, 'test@gmail.com', '测试数据 山东 hello')
match: ret: 10013->('user', 7, 'test@gmail.com', '测试数据 武夷山 hello')
match: ret: 10013->('user', 8, 'test@gmail.com', '测试数据 黄山 hello')
match: ret: 10013->('ar', 1, '销售额度', '测试数据 test hello', '程度,修改描述可以成功')
match: ret: 10013->('ar', 3, '蓝蓝的天上白云飘', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 4, '在水一方', '测试数据 背景 hello', '测 test ok')
match: ret: 10013->('ar', 5, '晴天,阴天,雨天,大风天', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 6, '每年365天,每天24小时', '测试数据 山东 hello', '测试 test ok')
match: ret: 10013->('ar', 7, '高效工作的秘密', '测试数据 武夷山 hello', '测试 test ok')
match: ret: 10013->('ar', 8, '战狼2', '测试数据 黄山 hello', '测试 test ok')
all done True

输入搜索词:  转型
match: ret: 10013->('ar', 2, '成功转型', '测试数据 china hello', '程度,修改描述可以成功')
all done True

输入搜索词:  test
match: ret: 10013->('user', 1, 'test@gmail.com', '测试数据 test hello')
match: ret: 10013->('user', 2, 'test@gmail.com', '测试数据 china hello')
match: ret: 10013->('user', 3, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 4, 'test@gmail.com', '测试数据 背景 hello')
match: ret: 10013->('user', 5, 'test@gmail.com', '测试数据 上海 hello')
match: ret: 10013->('user', 6, 'test@gmail.com', '测试数据 山东 hello')
match: ret: 10013->('user', 7, 'test@gmail.com', '测试数据 武夷山 hello')
match: ret: 10013->('user', 8, 'test@gmail.com', '测试数据 黄山 hello')
match: ret: 10013->('ar', 1, '销售额度', '测试数据 test hello', '程度,修改描述可以成功')
match: ret: 10013->('ar', 3, '蓝蓝的天上白云飘', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 4, '在水一方', '测试数据 背景 hello', '测试 test ok')
match: ret: 10013->('ar', 5, '晴天,阴天,雨天,大风天', '测试数据 上海 hello', '测试 test ok')
match: ret: 10013->('ar', 6, '每年365天,每天24小时', '测试数据 山东 hello', '测试 test ok')
match: ret: 10013->('ar', 7, '高效工作的秘密', '测试数据 武夷山 hello', '测试 test ok')
match: ret: 10013->('ar', 8, '战狼2', '测试数据 黄山 hello', '测试 test ok')
all done True

输入搜索词:

版权声明:python3.6使用sqlalchemy读取mysql中的数据并进行多进程并发处理是由宝哥软件园云端程序自动收集整理而来。如果本文侵犯了你的权益,请联系本站底部QQ或者邮箱删除。

相关文章推荐