psycopg2:使用一个查询插入多行
- 2024-12-20 08:37:00
- admin 原创
- 76
问题描述:
我需要用一个查询插入多行(行数不是常数),所以我需要执行如下查询:
INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);
我知道的唯一方法是
args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)
但我想要一些更简单的方法。
解决方案 1:
我编写了一个程序,可以向位于另一个城市的服务器插入多条线路。
我发现使用此方法比 快 10 倍左右executemany
。在我的例子中tup
,元组包含大约 2000 行。使用此方法大约需要 10 秒:
args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)
使用此方法需要 2 分钟:
cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)
解决方案 2:
Psycopg 2.7 中的新execute_values
方法:
data = [(1,'x'), (2,'y')]
insert_query = 'insert into t (a, b) values %s'
psycopg2.extras.execute_values (
cursor, insert_query, data, template=None, page_size=100
)
在 Psycopg 2.6 中采用 Python 方式执行此操作:
data = [(1,'x'), (2,'y')]
records_list_template = ','.join(['%s'] * len(data))
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
cursor.execute(insert_query, data)
解释:如果要插入的数据以元组列表的形式给出,例如
data = [(1,'x'), (2,'y')]
那么它已经是所需的格式了
values
该子句的语法要求insert
记录列表如下
insert into t (a, b) values (1, 'x'),(2, 'y')
Psycopg
将 Python 适配tuple
到 Postgresqlrecord
。
唯一需要做的工作是提供一个记录列表模板,供 psycopg 填写
# We use the data list to be sure of the template length
records_list_template = ','.join(['%s'] * len(data))
并将其放入insert
查询中
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
打印insert_query
输出
insert into t (a, b) values %s,%s
现在来谈谈常见的Psycopg
参数替换
cursor.execute(insert_query, data)
或者只是测试将发送到服务器的内容
print (cursor.mogrify(insert_query, data).decode('utf8'))
输出:
insert into t (a, b) values (1, 'x'),(2, 'y')
解决方案 3:
使用 psycopg2 2.7 更新:
经典方法executemany()
比 @ant32 的实现(称为“折叠”)慢约 60 倍,如本主题所述: https: //www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com
此实现在 psycopg2 2.7 版本中被添加到其中,并被称为execute_values()
:
from psycopg2.extras import execute_values
execute_values(cur,
"INSERT INTO test (id, v1, v2) VALUES %s",
[(1, 2, 3), (4, 5, 6), (7, 8, 9)])
上一个答案:
要插入多行,使用多行VALUES
语法execute()
比使用 psycopg2 快 10 倍左右executemany()
。实际上,executemany()
只需运行许多单独的INSERT
语句。
@ant32 的代码在 Python 2 中运行完美。但是在 Python 3 中,cursor.mogrify()
返回字节,cursor.execute()
接受字节或字符串,并','.join()
需要str
实例。
因此在 Python 3 中您可能需要修改 @ant32 的代码,通过添加.decode('utf-8')
:
args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)
或者仅使用字节(带b''
或b""
):
args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes)
解决方案 4:
cursor.copy_from是迄今为止我发现的批量插入最快的解决方案。这是我制作的要点,其中包含一个名为 IteratorFile 的类,它允许迭代器产生字符串,就像文件一样被读取。我们可以使用生成器表达式将每个输入记录转换为字符串。所以解决方案是
args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{} {}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))
对于这种微不足道的参数大小,它不会对速度产生太大的影响,但我发现在处理数千行以上时,速度会大大提高。它还比构建巨大的查询字符串更节省内存。迭代器一次只能在内存中保存一条输入记录,而在某些时候,你会在 Python 进程或 Postgres 中通过构建查询字符串耗尽内存。
解决方案 5:
来自 Postgresql.org 上 Psycopg2 教程页面的片段(见底部):
我想要向您展示的最后一项是如何使用字典插入多行。如果您有以下内容:
namedict = ({"first_name":"Joshua", "last_name":"Drake"},
{"first_name":"Steven", "last_name":"Foo"},
{"first_name":"David", "last_name":"Bar"})
您可以使用以下命令轻松地将所有三行插入字典中:
cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)
它并没有节省太多的代码,但它确实看起来更好。
解决方案 6:
安全漏洞
截至 2022-11-16,@Clodoaldo Neto(针对 Psycopg 2.6)、@Joseph Sheedy、@JJ、@Bart Jonk、@kevo Njoki、@TKoutny 和 @Nihal Sharma 的答案包含SQL 注入漏洞,不应使用。
迄今为止最快的提案 ( ) 也不应使用copy_from
,因为很难正确转义数据。当尝试插入诸如'
、"
、`、或 之类的字符时,这一点很容易显现出来。
`
`
`
psycopg2 的作者还建议不要copy_from
:
copy_from() 和 copy_to() 实际上只是古老且不完整的方法
最快的方法
最快的方法是cursor.copy_expert
,可以直接从 CSV 文件插入数据。
with open("mydata.csv") as f:
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)
copy_expert
也是动态生成 CSV 文件时最快的方法。作为参考,请参阅以下CSVFile
类,它注意限制内存使用量。
import io, csv
class CSVFile(io.TextIOBase):
# Create a CSV file from rows. Can only be read once.
def __init__(self, rows, size=8192):
self.row_iter = iter(rows)
self.buf = io.StringIO()
self.available = 0
self.size = size
def read(self, n):
# Buffer new CSV rows until enough data is available
buf = self.buf
writer = csv.writer(buf)
while self.available < n:
try:
row_length = writer.writerow(next(self.row_iter))
self.available += row_length
self.size = max(self.size, row_length)
except StopIteration:
break
# Read requested amount of data from buffer
write_pos = buf.tell()
read_pos = write_pos - self.available
buf.seek(read_pos)
data = buf.read(n)
self.available -= len(data)
# Shrink buffer if it grew very large
if read_pos > 2 * self.size:
remaining = buf.read()
buf.seek(0)
buf.write(remaining)
buf.truncate()
else:
buf.seek(write_pos)
return data
然后可以像这样使用此类:
rows = [(1, "a", "b"), (2, "c", "d")]
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", CSVFile(rows))
如果所有数据都适合内存,您也可以直接生成整个 CSV 数据而不使用CSVFile
类,但如果您不知道将来要插入多少数据,则可能不应该这样做。
f = io.StringIO()
writer = csv.writer(f)
for row in rows:
writer.writerow(row)
f.seek(0)
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)
基准测试结果
914 毫秒 - 多次调用
cursor.execute
846 毫秒 -
cursor.executemany
362毫秒-
psycopg2.extras.execute_batch
346 毫秒 -
execute_batch
带有page_size=1000
265 毫秒 -
execute_batch
使用准备好的语句161毫秒-
psycopg2.extras.execute_values
127 毫秒 -
cursor.execute
带有字符串连接的值39 毫秒——
copy_expert
一次生成整个 CSV 文件32 毫秒
copy_expert
-CSVFile
解决方案 7:
所有这些技术在 Postgres 术语中都称为“扩展插入”,截至 2016 年 11 月 24 日,它仍然比 psychopg2 的 executemany() 和本线程中列出的所有其他方法(我在得到这个答案之前尝试过)快很多。
这里有一些不使用 cur.mogrify 的代码,它们很好并且简单易懂:
valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
# row == [1, 'a', 'yolo', ... ]
sqlrows += row
if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
# sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
cur.execute(insertSQL, sqlrows)
con.commit()
sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()
但需要注意的是,如果可以使用copy_from(),那么就应该使用copy_from;)
解决方案 8:
10 年后,Psycopg 3 和 PostgreSQL 14 或更新版本的答案是:使用管道模式。在Psycopg 3 的管道模式实现中,在 execute/executemany 语句中使用普通 INSERT可以快速安全地防止SQL 注入。从 Psycopg 3.1 开始,executemany() 在内部使用管道模式2。
启动管道模式和 executemany() 的示例。自 Psycopg 3.1 起,如果仅调用一次,conn.pipeline() 仅对 execute() 是必要的,而对 executemany() 不再是必要的。
args = [(1,2), (3,4), (5,6)]
with conn.pipeline():
cur.executemany("INSERT INTO t (a, b) VALUES (%s, %s)", args)
解决方案 9:
我已经使用上述 ant32 的答案好几年了。但是我发现在 python 3 中会引发错误,因为它mogrify
返回的是字节字符串。
明确转换为字节串字符串是使代码与 Python 3 兼容的一个简单解决方案。
args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_str)
解决方案 10:
executemany接受元组数组
https://www.postgresqltutorial.com/postgresql-python/insert/
""" array of tuples """
vendor_list = [(value1,)]
""" insert multiple vendors into the vendors table """
sql = "INSERT INTO vendors(vendor_name) VALUES(%s)"
conn = None
try:
# read database configuration
params = config()
# connect to the PostgreSQL database
conn = psycopg2.connect(**params)
# create a new cursor
cur = conn.cursor()
# execute the INSERT statement
cur.executemany(sql,vendor_list)
# commit the changes to the database
conn.commit()
# close communication with the database
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
解决方案 11:
@jopseph.sheedy (https://stackoverflow.com/users/958118/joseph-sheedy )提供的cursor.copyfrom解决方案(https://stackoverflow.com/a/30721460/11100064)确实速度极快。
但是,他给出的示例对于具有任意数量字段的记录并不具有普遍适用性,我花了一段时间才弄清楚如何正确使用它。
IteratorFile 需要用制表符分隔的字段来实例化,如下所示(r
是一个字典列表,其中每个字典都是一条记录):
f = IteratorFile("{0} {1} {2} {3} {4}".format(r["id"],
r["type"],
r["item"],
r["month"],
r["revenue"]) for r in records)
为了概括任意数量的字段,我们首先创建一个具有正确数量的制表符和字段占位符的行字符串:"{} {} {}.... {}"
,然后使用它.format()
来填写字段值 *list(r.values())) for r in records
::
line = " ".join(["{}"] * len(records[0]))
f = IteratorFile(line.format(*list(r.values())) for r in records)
完整的功能要点在这里。
解决方案 12:
自从这个问题发布以来,execute_batch已经被添加到 psycopg2 中。
它比execute_values更快。
解决方案 13:
另一种好的、有效的方法是将要插入的行作为 1 个参数传递,该参数是 json 对象的数组。
例如你传递的参数:
[ {id: 18, score: 1}, { id: 19, score: 5} ]
它是一个数组,里面可以包含任意数量的对象。那么你的 SQL 如下所示:
INSERT INTO links (parent_id, child_id, score)
SELECT 123, (r->>'id')::int, (r->>'score')::int
FROM unnest($1::json[]) as r
注意:你的 postgress 必须足够新,以支持 json
解决方案 14:
如果您使用 SQLAlchemy,那么您不需要手动输入字符串,因为 SQLAlchemy支持为单个语句生成多行VALUES
`INSERT`子句:
rows = []
for i, name in enumerate(rawdata):
row = {
'id': i,
'name': name,
'valid': True,
}
rows.append(row)
if len(rows) > 0: # INSERT fails if no rows
insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
session.execute(insert_query)
解决方案 15:
psycopg2 2.9.3
data = "(1, 2), (3, 4), (5, 6)"
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)
或者
data = [(1, 2), (3, 4), (5, 6)]
data = ",".join(map(str, data))
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)
解决方案 16:
Psycopg 3 提供了一种使用 Postgres COPY的简单方法,效率很高。
records = [(1,2), (3,4), (5,6)]
with cursor.copy("COPY example_table (col_a, col_b) FROM STDIN") as copy:
for record in records:
copy.write_row(record)
官方文档中提供了更多信息。
解决方案 17:
如果您想在一个插入语句中插入多行(假设您不使用 ORM),到目前为止,对我来说最简单的方法是使用字典列表。以下是一个例子:
t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6},
{'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7},
{'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}]
conn.execute("insert into campaign_dates
(id, start_date, end_date, campaignid)
values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);",
t)
如您所见,只会执行一个查询:
INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);
INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}]
INFO sqlalchemy.engine.base.Engine COMMIT
解决方案 18:
来自@ant32
def myInsertManyTuples(connection, table, tuple_of_tuples):
cursor = connection.cursor()
try:
insert_len = len(tuple_of_tuples[0])
insert_template = "("
for i in range(insert_len):
insert_template += "%s,"
insert_template = insert_template[:-1] + ")"
args_str = ",".join(
cursor.mogrify(insert_template, x).decode("utf-8")
for x in tuple_of_tuples
)
cursor.execute("INSERT INTO " + table + " VALUES " + args_str)
connection.commit()
except psycopg2.Error as e:
print(f"psycopg2.Error in myInsertMany = {e}")
connection.rollback()
解决方案 19:
我使用的解决方案可以在 1 毫秒内插入 8000 条记录
curtime = datetime.datetime.now()
postData = dict()
postData["title"] = "This is Title Text"
postData["body"] = "This a Body Text it Can be Long Text"
postData['created_at'] = curtime.isoformat()
postData['updated_at'] = curtime.isoformat()
data = []
for x in range(8000):
data.append(((postData)))
vals = []
for d in postData:
vals.append(tuple(d.values())) #Here we extract the Values from the Dict
flds = ",".join(map(str, postData[0]))
tableFlds = ",".join(map(str, vals))
sqlStr = f"INSERT INTO posts ({flds}) VALUES {tableFlds}"
db.execute(sqlStr)
connection.commit()
rowsAffected = db.rowcount
print(f'{rowsAffected} Rows Affected')
解决方案 20:
最后在 SQLalchemy1.2 版本中,当您使用 use_batch_mode=True 初始化引擎时,添加了这个新实现以使用 psycopg2.extras.execute_batch() 而不是 executemany,如下所示:
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
use_batch_mode=True)
http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109
然后有人将不得不使用 SQLalchmey,而不会费心尝试 sqla 和 psycopg2 以及 direct SQL 的不同组合。
解决方案 21:
使用 aiopg - 下面的代码片段运行良好
# items = [10, 11, 12, 13]
# group = 1
tup = [(gid, pid) for pid in items]
args_str = ",".join([str(s) for s in tup])
# insert into group values (1, 10), (1, 11), (1, 12), (1, 13)
yield from cur.execute("INSERT INTO group VALUES " + args_str)
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)
- 项目管理必备:盘点2024年13款好用的项目管理软件