使用 Python 将 CSV 文件导入 sqlite3 数据库表

2024-12-30 08:42:00
admin
原创
95
摘要:问题描述:我有一个 CSV 文件,我想使用 Python 将此文件批量导入到我的 sqlite3 数据库中。命令是“.import .....”。但似乎不能这样工作。有人能给我一个如何在 sqlite3 中执行此操作的示例吗?为了以防万一,我使用的是 Windows。谢谢解决方案 1:import csv, ...

问题描述:

我有一个 CSV 文件,我想使用 Python 将此文件批量导入到我的 sqlite3 数据库中。命令是“.import .....”。但似乎不能这样工作。有人能给我一个如何在 sqlite3 中执行此操作的示例吗?为了以防万一,我使用的是 Windows。谢谢


解决方案 1:

import csv, sqlite3

con = sqlite3.connect(":memory:") # change to 'sqlite:///your_filename.db'
cur = con.cursor()
cur.execute("CREATE TABLE t (col1, col2);") # use your column names here

with open('data.csv','r') as fin: # `with` statement available in 2.5+
    # csv.DictReader uses first line in file for column headings by default
    dr = csv.DictReader(fin) # comma is default delimiter
    to_db = [(i['col1'], i['col2']) for i in dr]

cur.executemany("INSERT INTO t (col1, col2) VALUES (?, ?);", to_db)
con.commit()
con.close()

解决方案 2:

创建与磁盘上的文件的 sqlite 连接留给读者练习...但是现在 pandas 库可以实现两行代码

df = pandas.read_csv(csvfile)
df.to_sql(table_name, conn, if_exists='append', index=False)

解决方案 3:

您说得对,这.import是可行的方法,但这是来自 SQLite3 命令行程序的命令。这个问题的许多最佳答案都涉及本机 Python 循环,但如果您的文件很大(我的文件是 10^6 到 10^7 条记录),您需要避免将所有内容读入 pandas 或使用本机 Python 列表理解/循环(尽管我没有对它们进行计时以进行比较)。

对于大型文件,我认为最好的选择是使用subprocess.run()执行 sqlite 的导入命令。在下面的示例中,我假设表已经存在,但 csv 文件在第一行有标题。有关更多信息,请参阅.import文档。

subprocess.run()

from pathlib import Path
db_name = Path('my.db').resolve()
csv_file = Path('file.csv').resolve()
result = subprocess.run(['sqlite3',
                         str(db_name),
                         '-cmd',
                         '.mode csv',
                         '.import --skip 1 ' + str(csv_file).replace('\\','\\\\')
                                 +' <table_name>'],
                        capture_output=True)

编辑说明:sqlite3 的.import命令已经改进,因此它可以将第一行视为标题名称,甚至跳过前x行(需要版本 >=3.32,如此答案中所述。如果您使用的是旧版本的 sqlite3,则可能需要先创建表,然后在导入之前删除 csv 的第一行。--skip 1在 3.32 之前,该参数将给出错误

解释

从命令行,您要查找的命令是sqlite3 my.db -cmd ".mode csv" ".import file.csv table"subprocess.run()运行命令行进程。 的参数subprocess.run()是一系列字符串,它们被解释为命令,后跟其所有参数。

  • sqlite3 my.db打开数据库

  • -cmd数据库后面的标志允许您将多个后续命令传递给 sqlite 程序。在 shell 中,每个命令都必须放在引号中,但在这里,它们只需要是序列中的元素

  • '.mode csv'符合你的预期

  • '.import --skip 1'+str(csv_file).replace('\','\\')+' <table_name>'是导入命令。

遗憾的是,由于 subprocess 将所有后续操作都-cmd作为带引号的字符串传递,因此如果您有 Windows 目录路径,则需要将反斜杠加倍。

剥去标题

这并不是问题的重点,但这是我使用的。同样,我不想在任何时候将整个文件读入内存:

with open(csv, "r") as source:
    source.readline()
    with open(str(csv)+"_nohead", "w") as target:
        shutil.copyfileobj(source, target)

解决方案 4:

我的看法(比较笼统):

import csv, sqlite3
import logging

def _get_col_datatypes(fin):
    dr = csv.DictReader(fin) # comma is default delimiter
    fieldTypes = {}
    for entry in dr:
        feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]
        if not feildslLeft: break # We're done
        for field in feildslLeft:
            data = entry[field]

            # Need data to decide
            if len(data) == 0:
                continue

            if data.isdigit():
                fieldTypes[field] = "INTEGER"
            else:
                fieldTypes[field] = "TEXT"
        # TODO: Currently there's no support for DATE in sqllite

    if len(feildslLeft) > 0:
        raise Exception("Failed to find all the columns data types - Maybe some are empty?")

    return fieldTypes


def escapingGenerator(f):
    for line in f:
        yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")


def csvToDb(csvFile, outputToFile = False):
    # TODO: implement output to file

    with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
        dt = _get_col_datatypes(fin)

        fin.seek(0)

        reader = csv.DictReader(fin)

        # Keep the order of the columns name just as in the CSV
        fields = reader.fieldnames
        cols = []

        # Set field and type
        for f in fields:
            cols.append("%s %s" % (f, dt[f]))

        # Generate create table statement:
        stmt = "CREATE TABLE ads (%s)" % ",".join(cols)

        con = sqlite3.connect(":memory:")
        cur = con.cursor()
        cur.execute(stmt)

        fin.seek(0)


        reader = csv.reader(escapingGenerator(fin))

        # Generate insert statement:
        stmt = "INSERT INTO ads VALUES(%s);" % ','.join('?' * len(cols))

        cur.executemany(stmt, reader)
        con.commit()

    return con

解决方案 5:

该命令是 sqlite3 命令行工具的一个功能。要在 Python 中执行此操作,您只需使用 Python 拥有的任何功能(例如csv 模块.import)加载数据,然后照常插入数据。

这样,您还可以控制插入的类型,而不是依赖于 sqlite3 看似未记录的行为。

解决方案 6:

非常感谢 Bernie 的回答!不得不稍微调整一下 - 以下是对我有用的方法:

import csv, sqlite3
conn = sqlite3.connect("pcfc.sl3")
curs = conn.cursor()
curs.execute("CREATE TABLE PCFC (id INTEGER PRIMARY KEY, type INTEGER, term TEXT, definition TEXT);")
reader = csv.reader(open('PC.txt', 'r'), delimiter='|')
for row in reader:
    to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8"), unicode(row[2], "utf8")]
    curs.execute("INSERT INTO PCFC (type, term, definition) VALUES (?, ?, ?);", to_db)
conn.commit()

我的文本文件(PC.txt)如下所示:

1 | Term 1 | Definition 1
2 | Term 2 | Definition 2
3 | Term 3 | Definition 3

解决方案 7:

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys, csv, sqlite3

def main():
    con = sqlite3.connect(sys.argv[1]) # database file input
    cur = con.cursor()
    cur.executescript("""
        DROP TABLE IF EXISTS t;
        CREATE TABLE t (COL1 TEXT, COL2 TEXT);
        """) # checks to see if table exists and makes a fresh table.

    with open(sys.argv[2], "rb") as f: # CSV file input
        reader = csv.reader(f, delimiter=',') # no header information with delimiter
        for row in reader:
            to_db = [unicode(row[0], "utf8"), unicode(row[1], "utf8")] # Appends data from CSV file representing and handling of text
            cur.execute("INSERT INTO neto (COL1, COL2) VALUES(?, ?);", to_db)
            con.commit()
    con.close() # closes connection to database

if __name__=='__main__':
    main()

解决方案 8:

如果您的 CSV 文件非常大,以下解决方案会起作用。to_sql按照另一个答案的建议使用,但设置 chunksize 以便它不会尝试一次处理整个文件。

import sqlite3
import pandas as pd

conn = sqlite3.connect('my_data.db')
c = conn.cursor()
users = pd.read_csv('users.csv')
users.to_sql('users', conn, if_exists='append', index = False, chunksize = 10000)

您还可以使用 Dask,如此处所述并行编写大量 Pandas DataFrames:

dto_sql = dask.delayed(pd.DataFrame.to_sql)
out = [dto_sql(d, 'table_name', db_url, if_exists='append', index=True)
       for d in ddf.to_delayed()]
dask.compute(*out)

请参阅此处了解更多详细信息。

解决方案 9:

"""
cd Final_Codes
python csv_to_db.py
CSV to SQL DB
"""

import csv
import sqlite3
import os
import fnmatch

UP_FOLDER = os.path.dirname(os.getcwd())
DATABASE_FOLDER = os.path.join(UP_FOLDER, "Databases")
DBNAME = "allCompanies_database.db"


def getBaseNameNoExt(givenPath):
    """Returns the basename of the file without the extension"""
    filename = os.path.splitext(os.path.basename(givenPath))[0]
    return filename


def find(pattern, path):
    """Utility to find files wrt a regex search"""
    result = []
    for root, dirs, files in os.walk(path):
        for name in files:
            if fnmatch.fnmatch(name, pattern):
                result.append(os.path.join(root, name))
    return result


if __name__ == "__main__":
    Database_Path = os.path.join(DATABASE_FOLDER, DBNAME)
    # change to 'sqlite:///your_filename.db'
    csv_files = find('*.csv', DATABASE_FOLDER)

    con = sqlite3.connect(Database_Path)
    cur = con.cursor()
    for each in csv_files:
        with open(each, 'r') as fin:  # `with` statement available in 2.5+
            # csv.DictReader uses first line in file for column headings by default
            dr = csv.DictReader(fin)  # comma is default delimiter
            TABLE_NAME = getBaseNameNoExt(each)
            Cols = dr.fieldnames
            numCols = len(Cols)
            """
            for i in dr:
                print(i.values())
            """
            to_db = [tuple(i.values()) for i in dr]
            print(TABLE_NAME)
            # use your column names here
            ColString = ','.join(Cols)
            QuestionMarks = ["?"] * numCols
            ToAdd = ','.join(QuestionMarks)
            cur.execute(f"CREATE TABLE {TABLE_NAME} ({ColString});")
            cur.executemany(
                f"INSERT INTO {TABLE_NAME} ({ColString}) VALUES ({ToAdd});", to_db)
            con.commit()
    con.close()
    print("Execution Complete!")

当您文件夹中有大量 csv 文件并且希望一次性将其转换为单个 .db 文件时,这应该会很方便!

请注意,您不必事先知道文件名、表名或字段名(列名)!

解决方案 10:

如果必须将 CSV 文件作为 python 程序的一部分导入,那么为了简单和效率,您可以os.system按照以下建议的方式使用:

import os

cmd = """sqlite3 database.db <<< ".import input.csv mytable" """

rc = os.system(cmd)

print(rc)

重点是通过指定数据库的文件名,数据将被自动保存,假设读取数据时没有错误。

解决方案 11:

基于 Guy L 解决方案(喜欢它)但可以处理转义字段。

import csv, sqlite3

def _get_col_datatypes(fin):
    dr = csv.DictReader(fin) # comma is default delimiter
    fieldTypes = {}
    for entry in dr:
        feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]        
        if not feildslLeft: break # We're done
        for field in feildslLeft:
            data = entry[field]

            # Need data to decide
            if len(data) == 0:
                continue

            if data.isdigit():
                fieldTypes[field] = "INTEGER"
            else:
                fieldTypes[field] = "TEXT"
        # TODO: Currently there's no support for DATE in sqllite

    if len(feildslLeft) > 0:
        raise Exception("Failed to find all the columns data types - Maybe some are empty?")

    return fieldTypes


def escapingGenerator(f):
    for line in f:
        yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")


def csvToDb(csvFile,dbFile,tablename, outputToFile = False):

    # TODO: implement output to file

    with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
        dt = _get_col_datatypes(fin)

        fin.seek(0)

        reader = csv.DictReader(fin)

        # Keep the order of the columns name just as in the CSV
        fields = reader.fieldnames
        cols = []

        # Set field and type
        for f in fields:
            cols.append("\"%s\" %s" % (f, dt[f]))

        # Generate create table statement:
        stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
        print(stmt)
        con = sqlite3.connect(dbFile)
        cur = con.cursor()
        cur.execute(stmt)

        fin.seek(0)


        reader = csv.reader(escapingGenerator(fin))

        # Generate insert statement:
        stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))

        cur.executemany(stmt, reader)
        con.commit()
        con.close()

解决方案 12:

您可以使用blaze&odo有效地做到这一点

import blaze as bz
csv_path = 'data.csv'
bz.odo(csv_path, 'sqlite:///data.db::data')

Odo 会将 csv 文件存储到data.db模式下的(sqlite 数据库)中data

或者直接使用odo,不用blaze。两种方式都可以。阅读此文档

解决方案 13:

以下也可以根据 CSV 标头添加字段名称:

import sqlite3

def csv_sql(file_dir,table_name,database_name):
    con = sqlite3.connect(database_name)
    cur = con.cursor()
    # Drop the current table by: 
    # cur.execute("DROP TABLE IF EXISTS %s;" % table_name)

    with open(file_dir, 'r') as fl:
        hd = fl.readline()[:-1].split(',')
        ro = fl.readlines()
        db = [tuple(ro[i][:-1].split(',')) for i in range(len(ro))]

    header = ','.join(hd)
    cur.execute("CREATE TABLE IF NOT EXISTS %s (%s);" % (table_name,header))
    cur.executemany("INSERT INTO %s (%s) VALUES (%s);" % (table_name,header,('?,'*len(hd))[:-1]), db)
    con.commit()
    con.close()

# Example:
csv_sql('./surveys.csv','survey','eco.db')

解决方案 14:

为了简单起见,您可以使用项目 Makefile 中的 sqlite3 命令行工具。

%.sql3: %.csv
    rm -f $@
    sqlite3 $@ -echo -cmd ".mode csv" ".import $< $*"
%.dump: %.sql3
    sqlite3 $< "select * from $*"

make test.sql3然后从现有的 test.csv 文件创建 sqlite 数据库,其中包含单个表“test”。然后您可以make test.dump验证内容。

解决方案 15:

import csv, sqlite3

def _get_col_datatypes(fin):
    dr = csv.DictReader(fin) # comma is default delimiter
    fieldTypes = {}
    for entry in dr:
        feildslLeft = [f for f in dr.fieldnames if f not in fieldTypes.keys()]        
        if not feildslLeft: break # We're done
        for field in feildslLeft:
            data = entry[field]

        # Need data to decide
        if len(data) == 0:
            continue

        if data.isdigit():
            fieldTypes[field] = "INTEGER"
        else:
            fieldTypes[field] = "TEXT"
    # TODO: Currently there's no support for DATE in sqllite

if len(feildslLeft) > 0:
    raise Exception("Failed to find all the columns data types - Maybe some are empty?")

return fieldTypes


def escapingGenerator(f):
    for line in f:
        yield line.encode("ascii", "xmlcharrefreplace").decode("ascii")


def csvToDb(csvFile,dbFile,tablename, outputToFile = False):

    # TODO: implement output to file

    with open(csvFile,mode='r', encoding="ISO-8859-1") as fin:
        dt = _get_col_datatypes(fin)

        fin.seek(0)

        reader = csv.DictReader(fin)

        # Keep the order of the columns name just as in the CSV
        fields = reader.fieldnames
        cols = []

        # Set field and type
        for f in fields:
            cols.append("\"%s\" %s" % (f, dt[f]))

        # Generate create table statement:
        stmt = "create table if not exists \"" + tablename + "\" (%s)" % ",".join(cols)
        print(stmt)
        con = sqlite3.connect(dbFile)
        cur = con.cursor()
        cur.execute(stmt)

        fin.seek(0)


        reader = csv.reader(escapingGenerator(fin))

        # Generate insert statement:
        stmt = "INSERT INTO \"" + tablename + "\" VALUES(%s);" % ','.join('?' * len(cols))

        cur.executemany(stmt, reader)
        con.commit()
        con.close()

解决方案 16:

通过这个你也可以对 CSV 进行连接:

import sqlite3
import os
import pandas as pd
from typing import List

class CSVDriver:
    def __init__(self, table_dir_path: str):
        self.table_dir_path = table_dir_path  # where tables (ie. csv files) are located
        self._con = None

    @property
    def con(self) -> sqlite3.Connection:
        """Make a singleton connection to an in-memory SQLite database"""
        if not self._con:
            self._con = sqlite3.connect(":memory:")
        return self._con
    
    def _exists(self, table: str) -> bool:
        query = """
        SELECT name
        FROM sqlite_master 
        WHERE type ='table'
        AND name NOT LIKE 'sqlite_%';
        """
        tables = self.con.execute(query).fetchall()
        return table in tables

    def _load_table_to_mem(self, table: str, sep: str = None) -> None:
        """
        Load a CSV into an in-memory SQLite database
        sep is set to None in order to force pandas to auto-detect the delimiter
        """
        if self._exists(table):
            return
        file_name = table + ".csv"
        path = os.path.join(self.table_dir_path, file_name)
        if not os.path.exists(path):
            raise ValueError(f"CSV table {table} does not exist in {self.table_dir_path}")
        df = pd.read_csv(path, sep=sep, engine="python")  # set engine to python to skip pandas' warning
        df.to_sql(table, self.con, if_exists='replace', index=False, chunksize=10000)

    def query(self, query: str) -> List[tuple]:
        """
        Run an SQL query on CSV file(s). 
        Tables are loaded from table_dir_path
        """
        tables = extract_tables(query)
        for table in tables:
            self._load_table_to_mem(table)
        cursor = self.con.cursor()
        cursor.execute(query)
        records = cursor.fetchall()
        return records

提取表():

import sqlparse
from sqlparse.sql import IdentifierList, Identifier,  Function
from sqlparse.tokens import Keyword, DML
from collections import namedtuple
import itertools

class Reference(namedtuple('Reference', ['schema', 'name', 'alias', 'is_function'])):
    __slots__ = ()

    def has_alias(self):
        return self.alias is not None

    @property
    def is_query_alias(self):
        return self.name is None and self.alias is not None

    @property
    def is_table_alias(self):
        return self.name is not None and self.alias is not None and not self.is_function

    @property
    def full_name(self):
        if self.schema is None:
            return self.name
        else:
            return self.schema + '.' + self.name

def _is_subselect(parsed):
    if not parsed.is_group:
        return False
    for item in parsed.tokens:
        if item.ttype is DML and item.value.upper() in ('SELECT', 'INSERT',
                                                        'UPDATE', 'CREATE', 'DELETE'):
            return True
    return False


def _identifier_is_function(identifier):
    return any(isinstance(t, Function) for t in identifier.tokens)


def _extract_from_part(parsed):
    tbl_prefix_seen = False
    for item in parsed.tokens:
        if item.is_group:
            for x in _extract_from_part(item):
                yield x
        if tbl_prefix_seen:
            if _is_subselect(item):
                for x in _extract_from_part(item):
                    yield x
            # An incomplete nested select won't be recognized correctly as a
            # sub-select. eg: 'SELECT * FROM (SELECT id FROM user'. This causes
            # the second FROM to trigger this elif condition resulting in a
            # StopIteration. So we need to ignore the keyword if the keyword
            # FROM.
            # Also 'SELECT * FROM abc JOIN def' will trigger this elif
            # condition. So we need to ignore the keyword JOIN and its variants
            # INNER JOIN, FULL OUTER JOIN, etc.
            elif item.ttype is Keyword and (
                    not item.value.upper() == 'FROM') and (
                    not item.value.upper().endswith('JOIN')):
                tbl_prefix_seen = False
            else:
                yield item
        elif item.ttype is Keyword or item.ttype is Keyword.DML:
            item_val = item.value.upper()
            if (item_val in ('COPY', 'FROM', 'INTO', 'UPDATE', 'TABLE') or
                    item_val.endswith('JOIN')):
                tbl_prefix_seen = True
        # 'SELECT a, FROM abc' will detect FROM as part of the column list.
        # So this check here is necessary.
        elif isinstance(item, IdentifierList):
            for identifier in item.get_identifiers():
                if (identifier.ttype is Keyword and
                        identifier.value.upper() == 'FROM'):
                    tbl_prefix_seen = True
                    break


def _extract_table_identifiers(token_stream):
    for item in token_stream:
        if isinstance(item, IdentifierList):
            for ident in item.get_identifiers():
                try:
                    alias = ident.get_alias()
                    schema_name = ident.get_parent_name()
                    real_name = ident.get_real_name()
                except AttributeError:
                    continue
                if real_name:
                    yield Reference(schema_name, real_name,
                                    alias, _identifier_is_function(ident))
        elif isinstance(item, Identifier):
            yield Reference(item.get_parent_name(), item.get_real_name(),
                            item.get_alias(), _identifier_is_function(item))
        elif isinstance(item, Function):
            yield Reference(item.get_parent_name(), item.get_real_name(),
                            item.get_alias(), _identifier_is_function(item))


def extract_tables(sql):
    # let's handle multiple statements in one sql string
    extracted_tables = []
    statements = list(sqlparse.parse(sql))
    for statement in statements:
        stream = _extract_from_part(statement)
        extracted_tables.append([ref.name for ref in _extract_table_identifiers(stream)])
    return list(itertools.chain(*extracted_tables))

示例(假设account.csvtojoin.csv存在于/path/to/files):

db_path = r"/path/to/files"
driver = CSVDriver(db_path)
query = """
SELECT tojoin.col_to_join 
FROM account
LEFT JOIN tojoin
ON account.a = tojoin.a
"""
driver.query(query)

解决方案 17:

这是我的版本,通过要求您选择要转换的“.csv”文件,它已经可以正常工作

from multiprocessing import current_process
import pandas as pd
import sqlite3 
import os
from tkinter import Tk
from tkinter.filedialog import askopenfilename
from pathlib import Path

def csv_to_db(csv_filedir):

    if not Path(csv_filedir).is_file():                         # if needed ask for user input of CVS file
        current_path = os.getcwd()
        Tk().withdraw()                                     
        csv_filedir = askopenfilename(initialdir=current_path) 

    try:
        data = pd.read_csv(csv_filedir)                             # load CSV file
    except:
        print("Something went wrong when opening to the file")
        print(csv_filedir)

    csv_df = pd.DataFrame(data)
    csv_df = csv_df.fillna('NULL')                              # make NaN = to 'NULL' for SQL format

    [path,filename] = os.path.split(csv_filedir)                # define path and filename 
    [filename,_] = os.path.splitext(filename)
    database_filedir = os.path.join(path, filename + '.db')

    conn = sqlite3.connect(database_filedir)                    # connect to SQL server

    [fields_sql, header_sql_string] = create_sql_fields(csv_df)

    # CREATE EMPTY DATABASE
    create_sql = ''.join(['CREATE TABLE IF NOT EXISTS ' + filename + ' (' + fields_sql + ')'])
    cursor = conn.cursor()
    cursor.execute(create_sql)
    
    # INSERT EACH ROW IN THE SQL DATABASE
    for irow in csv_df.itertuples():
        insert_values_string = ''.join(['INSERT INTO ', filename, header_sql_string, ' VALUES ('])
        insert_sql = f"{insert_values_string} {irow[1]}, '{irow[2]}','{irow[3]}', {irow[4]}, '{irow[5]}' )"
        print(insert_sql)
        cursor.execute(insert_sql)

    # COMMIT CHANGES TO DATABASE AND CLOSE CONNECTION
    conn.commit()
    conn.close()

    print('
' + csv_filedir + ' 
 converted to 
' + database_filedir)

    return database_filedir


def create_sql_fields(df):                                          # gather the headers of the CSV and create two strings 
    fields_sql = []                                                 # str1 = var1 TYPE, va2, TYPE ...
    header_names = []                                               # str2 = var1, var2, var3, var4
    for col in range(0,len(df.columns)):
        fields_sql.append(df.columns[col])
        fields_sql.append(str(df.dtypes[col]))

        header_names.append(df.columns[col])
        if col != len(df.columns)-1:
            fields_sql.append(',')
            header_names.append(',')

    fields_sql = ' '.join(fields_sql)
    fields_sql = fields_sql.replace('int64','integer')
    fields_sql = fields_sql.replace('float64','integer')
    fields_sql = fields_sql.replace('object','text')

    header_sql_string = '(' + ''.join(header_names) + ')'
    
    return fields_sql, header_sql_string


csv_to_db('')

解决方案 18:

我发现,为了避免内存耗尽,有必要将数据从 csv 传输到数据库的过程分成几部分。可以这样做:

import csv
import sqlite3
from operator import itemgetter

# Establish connection
conn = sqlite3.connect("mydb.db")

# Create the table 
conn.execute(
    """
    CREATE TABLE persons(
        person_id INTEGER,
        last_name TEXT, 
        first_name TEXT, 
        address TEXT
    )
    """
)

# These are the columns from the csv that we want
cols = ["person_id", "last_name", "first_name", "address"]

# If the csv file is huge, we instead add the data in chunks
chunksize = 10000

# Parse csv file and populate db in chunks
with conn, open("persons.csv") as f:
    reader = csv.DictReader(f)

    chunk = []
    for i, row in reader: 

        if i % chunksize == 0 and i > 0:
            conn.executemany(
                """
                INSERT INTO persons
                    VALUES(?, ?, ?, ?)
                """, chunk
            )
            chunk = []

        items = itemgetter(*cols)(row)
        chunk.append(items)

解决方案 19:

要将 csv 数据发送到 sqlite3 数据库,您可以使用以下方法 -

import csv
import sqlite3
import re

def csv_to_db(csv_filepath,db_filepath):
    data=[]
    with open(csv_filepath) as file:
        reader=csv.reader(file)
        data.append(tuple(reader)) 
        # insert operation in sqlite3 takes tuples as input


    connection = sqlite3.connect(db_filepath)
    cursor = connection.cursor()


    # Insert new rows
    rows=data
    
    for row in rows:
        cursor.execute(f"INSERT INTO {table} VALUES(?,?,?,?,?,?)", row)
    

    connection.commit()
    connection.close()
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用