psycopg2:使用一个查询插入多行

2024-12-20 08:37:00
admin
原创
76
摘要:问题描述:我需要用一个查询插入多行(行数不是常数),所以我需要执行如下查询:INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6); 我知道的唯一方法是args = [(1,2), (3,4), (5,6)] args_str = ','.join(cursor....

问题描述:

我需要用一个查询插入多行(行数不是常数),所以我需要执行如下查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我知道的唯一方法是

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但我想要一些更简单的方法。


解决方案 1:

我编写了一个程序,可以向位于另一个城市的服务器插入多条线路。

我发现使用此方法比 快 10 倍左右executemany。在我的例子中tup,元组包含大约 2000 行。使用此方法大约需要 10 秒:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

使用此方法需要 2 分钟:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)

解决方案 2:

Psycopg 2.7 中的新execute_values方法:

data = [(1,'x'), (2,'y')]
insert_query = 'insert into t (a, b) values %s'
psycopg2.extras.execute_values (
    cursor, insert_query, data, template=None, page_size=100
)

在 Psycopg 2.6 中采用 Python 方式执行此操作:

data = [(1,'x'), (2,'y')]
records_list_template = ','.join(['%s'] * len(data))
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
cursor.execute(insert_query, data)

解释:如果要插入的数据以元组列表的形式给出,例如

data = [(1,'x'), (2,'y')]

那么它已经是所需的格式了

  1. values该子句的语法要求insert记录列表如下

insert into t (a, b) values (1, 'x'),(2, 'y')

  1. Psycopg将 Python 适配tuple到 Postgresql record

唯一需要做的工作是提供一个记录列表模板,供 psycopg 填写

# We use the data list to be sure of the template length
records_list_template = ','.join(['%s'] * len(data))

并将其放入insert查询中

insert_query = 'insert into t (a, b) values {}'.format(records_list_template)

打印insert_query输出

insert into t (a, b) values %s,%s

现在来谈谈常见的Psycopg参数替换

cursor.execute(insert_query, data)

或者只是测试将发送到服务器的内容

print (cursor.mogrify(insert_query, data).decode('utf8'))

输出:

insert into t (a, b) values (1, 'x'),(2, 'y')

解决方案 3:

使用 psycopg2 2.7 更新:

经典方法executemany()比 @ant32 的实现(称为“折叠”)慢约 60 倍,如本主题所述: https: //www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

此实现在 psycopg2 2.7 版本中被添加到其中,并被称为execute_values()

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])

上一个答案:

要插入多行,使用多行VALUES语法execute()比使用 psycopg2 快 10 倍左右executemany()。实际上,executemany()只需运行许多单独的INSERT语句。

@ant32 的代码在 Python 2 中运行完美。但是在 Python 3 中,cursor.mogrify()返回字节,cursor.execute()接受字节或字符串,并','.join()需要str实例。

因此在 Python 3 中您可能需要修改 @ant32 的代码,通过添加.decode('utf-8')

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)

或者仅使用字节(带b''b""):

args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 

解决方案 4:

cursor.copy_from是迄今为止我发现的批量插入最快的解决方案。这是我制作的要点,其中包含一个名为 IteratorFile 的类,它允许迭代器产生字符串,就像文件一样被读取。我们可以使用生成器表达式将每个输入记录转换为字符串。所以解决方案是

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{}    {}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))

对于这种微不足道的参数大小,它不会对速度产生太大的影响,但我发现在处理数千行以上时,速度会大大提高。它还比构建巨大的查询字符串更节省内存。迭代器一次只能在内存中保存一条输入记录,而在某些时候,你会在 Python 进程或 Postgres 中通过构建查询字符串耗尽内存。

解决方案 5:

来自 Postgresql.org 上 Psycopg2 教程页面的片段(见底部):

我想要向您展示的最后一项是如何使用字典插入多行。如果您有以下内容:

namedict = ({"first_name":"Joshua", "last_name":"Drake"},
            {"first_name":"Steven", "last_name":"Foo"},
            {"first_name":"David", "last_name":"Bar"})

您可以使用以下命令轻松地将所有三行插入字典中:

cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)

它并没有节省太多的代码,但它确实看起来更好。

解决方案 6:

安全漏洞

截至 2022-11-16,@Clodoaldo Neto(针对 Psycopg 2.6)、@Joseph Sheedy、@JJ、@Bart Jonk、@kevo Njoki、@TKoutny 和 @Nihal Sharma 的答案包含SQL 注入漏洞,不应使用。

迄今为止最快的提案 ( ) 也不应使用copy_from,因为很难正确转义数据。当尝试插入诸如'"、`
、或 之类的字符时,这一点很容易显现出来。` `
`

psycopg2 的作者还建议不要copy_from

copy_from() 和 copy_to() 实际上只是古老且不完整的方法

最快的方法

最快的方法是cursor.copy_expert,可以直接从 CSV 文件插入数据。

with open("mydata.csv") as f:
    cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

copy_expert也是动态生成 CSV 文件时最快的方法。作为参考,请参阅以下CSVFile类,它注意限制内存使用量。

import io, csv

class CSVFile(io.TextIOBase):
    # Create a CSV file from rows. Can only be read once.
    def __init__(self, rows, size=8192):
        self.row_iter = iter(rows)
        self.buf = io.StringIO()
        self.available = 0
        self.size = size

    def read(self, n):
        # Buffer new CSV rows until enough data is available
        buf = self.buf
        writer = csv.writer(buf)
        while self.available < n:
            try:
                row_length = writer.writerow(next(self.row_iter))
                self.available += row_length
                self.size = max(self.size, row_length)
            except StopIteration:
                break

        # Read requested amount of data from buffer
        write_pos = buf.tell()
        read_pos = write_pos - self.available
        buf.seek(read_pos)
        data = buf.read(n)
        self.available -= len(data)

        # Shrink buffer if it grew very large
        if read_pos > 2 * self.size:
            remaining = buf.read()
            buf.seek(0)
            buf.write(remaining)
            buf.truncate()
        else:
            buf.seek(write_pos)

        return data

然后可以像这样使用此类:

rows = [(1, "a", "b"), (2, "c", "d")]
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", CSVFile(rows))

如果所有数据都适合内存,您也可以直接生成整个 CSV 数据而不使用CSVFile类,但如果您不知道将来要插入多少数据,则可能不应该这样做。

f = io.StringIO()
writer = csv.writer(f)
for row in rows:
    writer.writerow(row)
f.seek(0)
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

基准测试结果

  • 914 毫秒 - 多次调用cursor.execute

  • 846 毫秒 -cursor.executemany

  • 362毫秒-psycopg2.extras.execute_batch

  • 346 毫秒 -execute_batch带有page_size=1000

  • 265 毫秒 -execute_batch使用准备好的语句

  • 161毫秒-psycopg2.extras.execute_values

  • 127 毫秒 -cursor.execute带有字符串连接的值

  • 39 毫秒——copy_expert一次生成整个 CSV 文件

  • 32 毫秒copy_expert-CSVFile

解决方案 7:

所有这些技术在 Postgres 术语中都称为“扩展插入”,截至 2016 年 11 月 24 日,它仍然比 psychopg2 的 executemany() 和本线程中列出的所有其他方法(我在得到这个答案之前尝试过)快很多。

这里有一些不使用 cur.mogrify 的代码,它们很好并且简单易懂:

valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()

但需要注意的是,如果可以使用copy_from(),那么就应该使用copy_from;)

解决方案 8:

10 年后,Psycopg 3 和 PostgreSQL 14 或更新版本的答案是:使用管道模式。在Psycopg 3 的管道模式实现中,在 execute/executemany 语句中使用普通 INSERT可以快速安全地防止SQL 注入。从 Psycopg 3.1 开始,executemany() 在内部使用管道模式2。

启动管道模式和 executemany() 的示例。自 Psycopg 3.1 起,如果仅调用一次,conn.pipeline() 仅对 execute() 是必要的,而对 executemany() 不再是必要的。

args = [(1,2), (3,4), (5,6)]
with conn.pipeline():
  cur.executemany("INSERT INTO t (a, b) VALUES (%s, %s)", args)

解决方案 9:

我已经使用上述 ant32 的答案好几年了。但是我发现在 python 3 中会引发错误,因为它mogrify返回的是字节字符串。

明确转换为字节串字符串是使代码与 Python 3 兼容的一个简单解决方案。

args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) 
cur.execute(b"INSERT INTO table VALUES " + args_str)

解决方案 10:

executemany接受元组数组

https://www.postgresqltutorial.com/postgresql-python/insert/

    """ array of tuples """
    vendor_list = [(value1,)]

    """ insert multiple vendors into the vendors table  """
    sql = "INSERT INTO vendors(vendor_name) VALUES(%s)"
    conn = None
    try:
        # read database configuration
        params = config()
        # connect to the PostgreSQL database
        conn = psycopg2.connect(**params)
        # create a new cursor
        cur = conn.cursor()
        # execute the INSERT statement
        cur.executemany(sql,vendor_list)
        # commit the changes to the database
        conn.commit()
        # close communication with the database
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

解决方案 11:

@jopseph.sheedy (https://stackoverflow.com/users/958118/joseph-sheedy )提供的cursor.copyfrom解决方案(https://stackoverflow.com/a/30721460/11100064)确实速度极快。

但是,他给出的示例对于具有任意数量字段的记录并不具有普遍适用性,我花了一段时间才弄清楚如何正确使用它。

IteratorFile 需要用制表符分隔的字段来实例化,如下所示(r是一个字典列表,其中每个字典都是一条记录):

    f = IteratorFile("{0}    {1}    {2}    {3}    {4}".format(r["id"],
        r["type"],
        r["item"],
        r["month"],
        r["revenue"]) for r in records)

为了概括任意数量的字段,我们首先创建一个具有正确数量的制表符和字段占位符的行字符串:"{} {} {}.... {}",然后使用它.format()来填写字段值 *list(r.values())) for r in records::

        line = "    ".join(["{}"] * len(records[0]))

        f = IteratorFile(line.format(*list(r.values())) for r in records)

完整的功能要点在这里。

解决方案 12:

自从这个问题发布以来,execute_batch已经被添加到 psycopg2 中。

它比execute_values更快。

解决方案 13:

另一种好的、有效的方法是将要插入的行作为 1 个参数传递,该参数是 json 对象的数组。

例如你传递的参数:

[ {id: 18, score: 1}, { id: 19, score: 5} ]

它是一个数组,里面可以包含任意数量的对象。那么你的 SQL 如下所示:

INSERT INTO links (parent_id, child_id, score) 
SELECT 123, (r->>'id')::int, (r->>'score')::int 
FROM unnest($1::json[]) as r 

注意:你的 postgress 必须足够新,以支持 json

解决方案 14:

如果您使用 SQLAlchemy,那么您不需要手动输入字符串,因为 SQLAlchemy支持为单个语句生成多行VALUES`INSERT`子句:

rows = []
for i, name in enumerate(rawdata):
    row = {
        'id': i,
        'name': name,
        'valid': True,
    }
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)

解决方案 15:

psycopg2 2.9.3

data = "(1, 2), (3, 4), (5, 6)"
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)

或者

data = [(1, 2), (3, 4), (5, 6)]
data = ",".join(map(str, data))
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)

解决方案 16:

Psycopg 3 提供了一种使用 Postgres COPY的简单方法,效率很高。

records = [(1,2), (3,4), (5,6)]
with cursor.copy("COPY example_table (col_a, col_b) FROM STDIN") as copy:
    for record in records:
        copy.write_row(record)

官方文档中提供了更多信息。

解决方案 17:

如果您想在一个插入语句中插入多行(假设您不使用 ORM),到目前为止,对我来说最简单的方法是使用字典列表。以下是一个例子:

 t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6},
      {'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7},
      {'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}]

conn.execute("insert into campaign_dates
             (id, start_date, end_date, campaignid) 
              values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);",
             t)

如您所见,只会执行一个查询:

INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);
INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}]
INFO sqlalchemy.engine.base.Engine COMMIT

解决方案 18:

来自@ant32

def myInsertManyTuples(connection, table, tuple_of_tuples):
    cursor = connection.cursor()
    try:
        insert_len = len(tuple_of_tuples[0])
        insert_template = "("
        for i in range(insert_len):
            insert_template += "%s,"
        insert_template = insert_template[:-1] + ")"

        args_str = ",".join(
            cursor.mogrify(insert_template, x).decode("utf-8")
            for x in tuple_of_tuples
        )
        cursor.execute("INSERT INTO " + table + " VALUES " + args_str)
        connection.commit()

    except psycopg2.Error as e:
        print(f"psycopg2.Error in myInsertMany = {e}")
        connection.rollback()

解决方案 19:

我使用的解决方案可以在 1 毫秒内插入 8000 条记录

curtime = datetime.datetime.now()
postData = dict()
postData["title"] = "This is Title Text"
postData["body"] = "This a Body Text it Can be Long Text"
postData['created_at'] = curtime.isoformat()
postData['updated_at'] = curtime.isoformat()
data = []
for x in range(8000):
    data.append(((postData)))
vals = []
for d in postData:
    vals.append(tuple(d.values())) #Here we extract the Values from the Dict
flds = ",".join(map(str, postData[0]))
tableFlds =  ",".join(map(str, vals))
sqlStr = f"INSERT INTO posts ({flds}) VALUES {tableFlds}"
db.execute(sqlStr)
connection.commit()
rowsAffected = db.rowcount
print(f'{rowsAffected} Rows Affected')

解决方案 20:

最后在 SQLalchemy1.2 版本中,当您使用 use_batch_mode=True 初始化引擎时,添加了这个新实现以使用 psycopg2.extras.execute_batch() 而不是 executemany,如下所示:

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    use_batch_mode=True)

http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109

然后有人将不得不使用 SQLalchmey,而不会费心尝试 sqla 和 psycopg2 以及 direct SQL 的不同组合。

解决方案 21:

使用 aiopg - 下面的代码片段运行良好

    # items = [10, 11, 12, 13]
    # group = 1
    tup = [(gid, pid) for pid in items]
    args_str = ",".join([str(s) for s in tup])
    # insert into group values (1, 10), (1, 11), (1, 12), (1, 13)
    yield from cur.execute("INSERT INTO group VALUES " + args_str)
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   990  
  在项目管理领域,CDCP(Certified Data Center Professional)认证评审是一个至关重要的环节,它不仅验证了项目团队的专业能力,还直接关系到项目的成功与否。在这一评审过程中,沟通技巧的运用至关重要。有效的沟通不仅能够确保信息的准确传递,还能增强团队协作,提升评审效率。本文将深入探讨CDCP...
华为IPD流程   26  
  IPD(Integrated Product Development,集成产品开发)是一种以客户需求为核心、跨部门协同的产品开发模式,旨在通过高效的资源整合和流程优化,提升产品开发的成功率和市场竞争力。在IPD培训课程中,掌握关键成功因素是确保团队能够有效实施这一模式的核心。以下将从五个关键成功因素展开讨论,帮助企业和...
IPD项目流程图   27  
  华为IPD(Integrated Product Development,集成产品开发)流程是华为公司在其全球化进程中逐步构建和完善的一套高效产品开发管理体系。这一流程不仅帮助华为在技术创新和产品交付上实现了质的飞跃,还为其在全球市场中赢得了显著的竞争优势。IPD的核心在于通过跨部门协作、阶段性评审和市场需求驱动,确保...
华为IPD   26  
  华为作为全球领先的通信技术解决方案提供商,其成功的背后离不开一套成熟的管理体系——集成产品开发(IPD)。IPD不仅是一种产品开发流程,更是一种系统化的管理思想,它通过跨职能团队的协作、阶段评审机制和市场需求驱动的开发模式,帮助华为在全球市场中脱颖而出。从最初的国内市场到如今的全球化布局,华为的IPD体系在多个领域展现...
IPD管理流程   53  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用