Source code for fundamentals.mysql.database

#!/usr/local/bin/python
# encoding: utf-8
"""
*a database object that can setup up a ssh tunnel (optional) and a database connection*

Author
: David Young
"""

from builtins import object
import sys
import os

os.environ["TERM"] = "vt100"
try:
    import readline
except ImportError:
    # Windows: readline not available in stdlib Optionally use pyreadline3 if installed
    try:
        import pyreadline3 as readline
    except ImportError:
        readline = None  # or just pass — basic input() still works
import glob
import pickle
import time
from subprocess import Popen, PIPE, STDOUT
from docopt import docopt
from fundamentals.mysql import readquery


[docs] class database(object): """ *a database object that can setup up a ssh tunnel (optional) and a database connection* **Key Arguments** - ``log`` -- logger - ``dbSettings`` -- a dictionary of database settings **Return** - ``dbConns`` -- a database connection **Usage** Given a python dictionary that looks like this: ```python dbSettings = { 'host': '127.0.0.1', 'loginPath': 'atlasMovers', 'user': 'monster', 'tunnel': { 'remote ip': 'psweb.mp.qub.ac.uk', 'remote datbase host': 'dormammu', 'remote user': 'monster', 'port': 9006 }, 'password': 'myPass', 'db': 'atlas_moving_objects' } ``` ``loginPath`` and ``tunnel`` are optional, to setup the a database connection, run the following: ```python # SETUP ALL DATABASE CONNECTIONS from fundamentals.mysql import database dbConn = database( log=log, dbSettings=dbSettings ).connect() ``` """ # INITIALISATION def __init__(self, log, dbSettings=False, autocommit=True): self.log = log log.debug("instansiating a new '_database' object") self.dbSettings = dbSettings self.autocommit = autocommit return None
[docs] def connect(self): """*Connect to the database* **Return** - ``dbConn`` -- the database connection See the class docstring for usage """ self.log.debug("starting the ``connect`` method") import pymysql as ms dbSettings = self.dbSettings port = False if "tunnel" in dbSettings and dbSettings["tunnel"]: port = self._setup_tunnel(tunnelParameters=dbSettings["tunnel"]) elif "port" in dbSettings and dbSettings["port"]: port = int(dbSettings["port"]) # SETUP A DATABASE CONNECTION host = dbSettings["host"] user = dbSettings["user"] passwd = dbSettings["password"] dbName = dbSettings["db"] dbConn = ms.connect( host=host, user=user, passwd=passwd, db=dbName, port=port, use_unicode=True, charset="utf8mb4", local_infile=1, client_flag=ms.constants.CLIENT.MULTI_STATEMENTS, connect_timeout=36000, max_allowed_packet=51200000, ) if self.autocommit: dbConn.autocommit(True) self.log.debug("completed the ``connect`` method") return dbConn
def _setup_tunnel(self, tunnelParameters): """ *Setup a ssh tunnel for a database connection to port through* **Key Arguments** - ``tunnelParameters`` -- the tunnel parameters found associated with the database settings **Return** - ``sshPort`` -- the port the ssh tunnel is connected via """ self.log.debug("starting the ``_setup_tunnel`` method") # TEST TUNNEL DOES NOT ALREADY EXIST sshPort = tunnelParameters["port"] connected = self._checkServer("127.0.0.1", sshPort) if connected: self.log.debug("ssh tunnel already exists - moving on") else: # GRAB TUNNEL SETTINGS FROM SETTINGS FILE ru = tunnelParameters["remote user"] rip = tunnelParameters["remote ip"] rh = tunnelParameters["remote datbase host"] cmd = "ssh -fnN %(ru)s@%(rip)s -L %(sshPort)s:%(rh)s:3306" % locals() p = Popen(cmd, shell=True, close_fds=True) output = p.communicate()[0] self.log.debug("output: %(output)s" % locals()) # TEST CONNECTION - QUIT AFTER SO MANY TRIES connected = False count = 0 while not connected: connected = self._checkServer("127.0.0.1", sshPort) time.sleep(1) count += 1 if count == 5: self.log.error( "cound not setup tunnel to remote datbase" % locals() ) sys.exit(0) return sshPort def _checkServer(self, address, port): """*Check that the TCP Port we've decided to use for tunnelling is available*""" self.log.debug("starting the ``_checkServer`` method") # CREATE A TCP SOCKET import socket s = socket.socket() self.log.debug( """Attempting to connect to `%(address)s` on port `%(port)s`""" % locals() ) try: s.connect((address, port)) self.log.debug( """Connected to `%(address)s` on port `%(port)s`""" % locals() ) return True except socket.error as e: self.log.warning( """Connection to `%(address)s` on port `%(port)s` failed - try again: %(e)s""" % locals() ) return False return None
# xt-class-method