Source code for fundamentals.mysql.sqlite2mysql

#!/usr/local/bin/python
# encoding: utf-8
"""
Take a sqlite database file and copy the tables within it to a MySQL database

Usage:
    sqlite2mysql -s <pathToSettingsFile> <pathToSqliteDB> [<tablePrefix>]

Options:

    pathToSqliteDB        path to the sqlite database file
    tablePrefix           a string to prefix the table names when copying to mysql database
    pathToSettingsFile    path to a settings file with logging and database information (yaml file)

    -h, --help            show this help message
    -v, --version         show version
    -s, --settings        the settings file
"""

from builtins import object
import sys
import os
import sqlite3 as lite

os.environ["TERM"] = "vt100"
from fundamentals import tools
from fundamentals.mysql import writequery

from datetime import datetime, date, time


[docs] def main(arguments=None): """ The main function used when ``yaml_to_database.py`` when installed as a cl tool """ # setup the command-line util settings su = tools( arguments=arguments, docString=__doc__, logLevel="WARNING", options_first=False, projectName=False, ) arguments, settings, log, dbConn = su.setup() # unpack remaining cl arguments using `exec` to setup the variable names # automatically for arg, val in list(arguments.items()): if arg[0] == "-": varname = arg.replace("-", "") + "Flag" else: varname = arg.replace("<", "").replace(">", "") if isinstance(val, str): exec(varname + " = '%s'" % (val,)) else: exec(varname + " = %s" % (val,)) if arg == "--dbConn": dbConn = val log.debug( "%s = %s" % ( varname, val, ) ) from fundamentals.mysql import sqlite2mysql converter = sqlite2mysql( log=log, settings=settings, pathToSqlite=pathToSqliteDB, tablePrefix=tablePrefix, dbConn=dbConn, ) converter.convert_sqlite_to_mysql() return
[docs] class sqlite2mysql(object): """ *Take a sqlite database file and copy the tables within it to a MySQL database* **Key Arguments** - ``log`` -- logger - ``settings`` -- the settings dictionary - ``pathToSqlite`` -- path to the sqlite database to transfer into the MySQL database - ``tablePrefix`` -- a prefix to add to all the tablename when converting to mysql. Default *""* - ``dbConn`` -- mysql database connection **Usage** To setup your logger, settings and database connections, please use the ``fundamentals`` package (see tutorial here https://fundamentals.readthedocs.io/en/master/initialisation.html). To convert and import the content of a sqlite database into MySQL run the following: .. todo:: - add a tutorial about ``sqlite2mysql`` to documentation ```python from fundamentals.mysql import sqlite2mysql converter = sqlite2mysql( log=log, settings=settings, pathToSqlite="/path/to/sqlite.db", tablePrefix="external" ) converter.convert_sqlite_to_mysql() ``` """ # Initialisation def __init__(self, log, pathToSqlite, tablePrefix="", settings=False, dbConn=False): self.log = log log.debug("instansiating a new 'sqlite2mysql' object") self.settings = settings self.pathToSqlite = pathToSqlite self.tablePrefix = tablePrefix self.dbConn = dbConn if not self.tablePrefix: self.tablePrefix = "" if len(self.tablePrefix): self.tablePrefix = self.tablePrefix + "_" # xt-self-arg-tmpx return None
[docs] def convert_sqlite_to_mysql(self): """*copy the contents of the sqlite database into the mysql database* See class docstring for usage """ from fundamentals.renderer import list_of_dictionaries from fundamentals.mysql import directory_script_runner self.log.debug("starting the ``convert_sqlite_to_mysql`` method") con = lite.connect(self.pathToSqlite) con.row_factory = lite.Row cur = con.cursor() # GET ALL TABLE NAMES cur.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = cur.fetchall() createStatements = [] inserts = [] for table in tables: table = table["name"] if table == "sqlite_sequence": continue # CREATE TABLE collection_books (folder_id, fingerprint, primary key(folder_id, fingerprint)); # GENEREATE THE MYSQL CREATE STATEMENTS FOR EACH TABLE cur.execute( "SELECT sql FROM sqlite_master WHERE name = '%(table)s';" % locals() ) createStatement = cur.fetchone() createStatement = createStatement[0].replace('"', "`") + ";" if "DEFAULT" not in createStatement: if "primary key(" in createStatement: tmp = createStatement.split("primary key(") tmp[0] = tmp[0].replace(",", " varchar(150) DEFAULT NULL,") createStatement = ("primary key(").join(tmp) if "primary key," in createStatement: tmp = createStatement.split("primary key,") tmp[1] = tmp[1].replace(",", " varchar(150) DEFAULT NULL,") tmp[1] = tmp[1].replace(");", " varchar(150) DEFAULT NULL);") createStatement = ("primary key,").join(tmp) createStatement = createStatement.replace( "INTEGER PRIMARY KEY", "INTEGER AUTO_INCREMENT PRIMARY KEY" ) createStatement = createStatement.replace("AUTOINCREMENT", "AUTO_INCREMENT") createStatement = createStatement.replace("DEFAULT 't'", "DEFAULT '1'") createStatement = createStatement.replace("DEFAULT 'f'", "DEFAULT '0'") createStatement = createStatement.replace(",'t'", ",'1'") createStatement = createStatement.replace(",'f'", ",'0'") if "CREATE TABLE `" in createStatement: createStatement = createStatement.replace( "CREATE TABLE `", "CREATE TABLE IF NOT EXISTS `" + self.tablePrefix ) else: createStatement = createStatement.replace( "CREATE TABLE ", "CREATE TABLE IF NOT EXISTS " + self.tablePrefix ) if ", primary key(" in createStatement: createStatement = createStatement.replace( ", primary key(", """, `dateCreated` datetime DEFAULT CURRENT_TIMESTAMP, `dateLastModified` datetime DEFAULT CURRENT_TIMESTAMP, `updated` tinyint(4) DEFAULT '0', primary key(""", ) else: createStatement = createStatement.replace( ");", """, `dateCreated` datetime DEFAULT CURRENT_TIMESTAMP, `dateLastModified` datetime DEFAULT CURRENT_TIMESTAMP, `updated` tinyint(4) DEFAULT '0'); """, ) createStatement = createStatement.replace( " text primary key", " varchar(100) primary key" ) createStatement = createStatement.replace( "`EntryText` TEXT NOT NULL,", "`EntryText` TEXT," ) createStatement = createStatement.replace( "`SelectionText` TEXT NOT NULL", "`SelectionText` TEXT" ) createStatement = createStatement.replace( "`Filename` INTEGER NOT NULL,", "`Filename` TEXT NOT NULL," ) createStatement = createStatement.replace( "`SessionPartUUID` TEXT NOT NULL UNIQUE,", "`SessionPartUUID` VARCHAR(100) NOT NULL UNIQUE,", ) createStatement = createStatement.replace( "`Name` TEXT PRIMARY KEY NOT NULL", "`Name` VARCHAR(100) PRIMARY KEY NOT NULL", ) createStatement = createStatement.replace(" VARCHAR ", " VARCHAR(100) ") createStatement = createStatement.replace(" VARCHAR,", " VARCHAR(100),") if len(createStatement.lower().split("datecreated")) > 2: createStatement = createStatement.replace( "`dateCreated` datetime DEFAULT CURRENT_TIMESTAMP,\n", "" ) # GRAB THE DATA TO ADD TO THE MYSQL DATABASE TABLES cur.execute("SELECT * from '%(table)s';" % locals()) rows = cur.fetchall() allRows = [] dateCreated = True for row in rows: thisDict = dict(row) # CHECK FOR DATE-CREATED COLUMN - DON'T WANT TO DUPLICATE if dateCreated and "datecreated" in str(thisDict.keys()).lower(): dateCreated = False allRows.append(thisDict) # RECURSIVELY CREATE MISSING DIRECTORIES if not os.path.exists("/tmp/headjack/"): os.makedirs("/tmp/headjack/") writequery( log=self.log, sqlQuery=createStatement, dbConn=self.dbConn, ) from fundamentals.mysql import ( insert_list_of_dictionaries_into_database_tables, ) # USE dbSettings TO ACTIVATE MULTIPROCESSING insert_list_of_dictionaries_into_database_tables( dbConn=self.dbConn, log=self.log, dictList=allRows, dbTableName=self.tablePrefix + table, uniqueKeyList=[], dateModified=True, dateCreated=dateCreated, batchSize=10000, replace=True, dbSettings=self.settings["database settings"], ) # # DUMP THE DATA INTO A MYSQL DATABASE # dataSet = list_of_dictionaries( # log=self.log, # listOfDictionaries=allRows # ) # originalList = dataSet.list # now = datetime.now() # now = now.strftime("%Y%m%dt%H%M%S%f.sql") # mysqlData = dataSet.mysql( # tableName=self.tablePrefix + table, filepath="/tmp/headjack/" + # now, createStatement=createStatement) # directory_script_runner( # log=self.log, # pathToScriptDirectory="/tmp/headjack/", # databaseName=self.settings["database settings"]["db"], # loginPath=self.settings["database settings"]["loginPath"], # successRule="delete", # failureRule="failed" # ) con.close() self.log.debug("completed the ``convert_sqlite_to_mysql`` method") return None
# use the tab-trigger below for new method # xt-class-method if __name__ == "__main__": main()