Source code for fundamentals.renderer.list_of_dictionaries

#!/usr/local/bin/python
# encoding: utf-8
"""
*Render a python list of dictionaries in various list and markup formats*

Author
: David Young
"""

from builtins import str
from builtins import range
from builtins import object
import sys
import os
import io
import re
import codecs
import copy
import json
import yaml
from decimal import Decimal
from datetime import datetime

os.environ["TERM"] = "vt100"
from fundamentals import tools
from fundamentals.mysql import convert_dictionary_to_mysql_table


[docs] class list_of_dictionaries(object): """ *The dataset object is a list of python dictionaries. Using this class, the data can be rendered as various list and markup formats* **Key Arguments** - ``log`` -- logger - ``listOfDictionaries`` -- the list of dictionaries to render - ``reDatetime`` -- a pre-compiled datetime regex. Default *False*fss **Usage** To initialise the dataset object: ```python dataList = [ { "owner": "daisy", "pet": "dog", "address": "belfast, uk" }, { "owner": "john", "pet": "snake", "address": "the moon" }, { "owner": "susan", "pet": "crocodile", "address": "larne" } ] from fundamentals.renderer import list_of_dictionaries dataSet = list_of_dictionaries( log=log, listOfDictionaries=dataList ) ``` """ def __init__(self, log, listOfDictionaries, reDatetime=False): self.log = log self.log.debug("instansiating a new 'list_of_dictionaries' object") self.listOfDictionaries = listOfDictionaries self.reDatetime = reDatetime return None @property def list(self): """*Returns the original list of dictionaries* **Usage** dataSet.list """ return self.listOfDictionaries
[docs] def csv(self, filepath=None): """*Render the data in CSV format* **Key Arguments** - ``filepath`` -- path to the file to write the csv content to. Default *None* **Return** - ``renderedData`` -- the data rendered in csv format **Usage** To render the data set as csv: ```python print(dataSet.csv()) ``` ```text owner,pet,address daisy,dog,"belfast, uk" john,snake,the moon susan,crocodile,larne ``` and to save the csv rendering to file: ```python dataSet.csv("/path/to/myfile.csv") ``` """ self.log.debug("starting the ``csv`` method") renderedData = self._list_of_dictionaries_to_csv("machine") if filepath and renderedData != "NO MATCH": # RECURSIVELY CREATE MISSING DIRECTORIES if not os.path.exists(os.path.dirname(filepath)): os.makedirs(os.path.dirname(filepath)) writeFile = codecs.open(filepath, encoding="utf-8", mode="w") writeFile.write(renderedData) writeFile.close() self.log.debug("completed the ``csv`` method") return renderedData
[docs] def table(self, filepath=None): """*Render the data as a plain text table* **Key Arguments** - ``filepath`` -- path to the file to write the table to. Default *None* **Return** - ``renderedData`` -- the data rendered as a plain text table **Usage** To render the data set as a plain text table: ```python print(dataSet.table()) ``` ```text +--------+------------+--------------+ | owner | pet | address | +========+============+==============+ | daisy | dog | belfast, uk | | john | snake | the moon | | susan | crocodile | larne | +--------+------------+--------------+ ``` and to save the table rendering to file: ```python dataSet.table("/path/to/myfile.ascii") ``` """ self.log.debug("starting the ``table`` method") self.filepath = filepath renderedData = self._list_of_dictionaries_to_csv("human") if filepath and len(self.listOfDictionaries): # RECURSIVELY CREATE MISSING DIRECTORIES if not os.path.exists(os.path.dirname(filepath)): os.makedirs(os.path.dirname(filepath)) writeFile = codecs.open(filepath, encoding="utf-8", mode="w") writeFile.write(renderedData) writeFile.close() self.log.debug("completed the ``table`` method") return renderedData
[docs] def reST(self, filepath=None): """*Render the data as a resturcturedText table* **Key Arguments** - ``filepath`` -- path to the file to write the table to. Default *None* **Return** - ``renderedData`` -- the data rendered as a resturcturedText table **Usage** To render the data set as a resturcturedText table: ```python print(dataSet.reST()) ``` ```text +--------+------------+--------------+ | owner | pet | address | +========+============+==============+ | daisy | dog | belfast, uk | +--------+------------+--------------+ | john | snake | the moon | +--------+------------+--------------+ | susan | crocodile | larne | +--------+------------+--------------+ ``` and to save the table rendering to file: ```python dataSet.reST("/path/to/myfile.rst") ``` """ self.log.debug("starting the ``table`` method") self.filepath = filepath renderedData = self._list_of_dictionaries_to_csv("reST") if filepath and len(self.listOfDictionaries): # RECURSIVELY CREATE MISSING DIRECTORIES if not os.path.exists(os.path.dirname(filepath)): os.makedirs(os.path.dirname(filepath)) writeFile = codecs.open(filepath, encoding="utf-8", mode="w") writeFile.write(renderedData) writeFile.close() self.log.debug("completed the ``table`` method") return renderedData
[docs] def markdown(self, filepath=None): """*Render the data as a markdown table* **Key Arguments** - ``filepath`` -- path to the file to write the markdown to. Default *None* **Return** - ``renderedData`` -- the data rendered as a markdown table **Usage** To render the data set as a markdown table: ```python print(dataSet.markdown()) ``` ```markdown | owner | pet | address | |:-------|:-----------|:-------------| | daisy | dog | belfast, uk | | john | snake | the moon | | susan | crocodile | larne | ``` and to save the markdown table rendering to file: ```python dataSet.table("/path/to/myfile.md") ``` """ self.log.debug("starting the ``markdown`` method") self.filepath = filepath renderedData = self._list_of_dictionaries_to_csv("markdown") if filepath and len(self.listOfDictionaries): # RECURSIVELY CREATE MISSING DIRECTORIES if not os.path.exists(os.path.dirname(filepath)): os.makedirs(os.path.dirname(filepath)) writeFile = codecs.open(filepath, encoding="utf-8", mode="w") writeFile.write(renderedData) writeFile.close() self.log.debug("completed the ``markdown`` method") return renderedData
[docs] def json(self, filepath=None): """*Render the data in json format* **Key Arguments** - ``filepath`` -- path to the file to write the json content to. Default *None* **Return** - ``renderedData`` -- the data rendered as json **Usage** To render the data set as json: ```python print(dataSet.json()) ``` ```json [ { "address": "belfast, uk", "owner": "daisy", "pet": "dog" }, { "address": "the moon", "owner": "john", "pet": "snake" }, { "address": "larne", "owner": "susan", "pet": "crocodile" } ] ``` and to save the json rendering to file: ```python dataSet.json("/path/to/myfile.json") ``` """ self.log.debug("starting the ``json`` method") dataCopy = copy.deepcopy(self.listOfDictionaries) for d in dataCopy: for k, v in list(d.items()): if isinstance(v, datetime): d[k] = v.strftime("%Y%m%dt%H%M%S") renderedData = json.dumps( dataCopy, separators=(",", ": "), sort_keys=True, indent=4 ) if filepath and len(self.listOfDictionaries): # RECURSIVELY CREATE MISSING DIRECTORIES if not os.path.exists(os.path.dirname(filepath)): os.makedirs(os.path.dirname(filepath)) writeFile = codecs.open(filepath, encoding="utf-8", mode="w") writeFile.write(renderedData) writeFile.close() self.log.debug("completed the ``json`` method") return renderedData
[docs] def yaml(self, filepath=None): """*Render the data in yaml format* **Key Arguments** - ``filepath`` -- path to the file to write the yaml content to. Default *None* **Return** - ``renderedData`` -- the data rendered as yaml **Usage** To render the data set as yaml: ```python print(dataSet.yaml()) ``` ```yaml - address: belfast, uk owner: daisy pet: dog - address: the moon owner: john pet: snake - address: larne owner: susan pet: crocodile ``` and to save the yaml rendering to file: ```python dataSet.json("/path/to/myfile.yaml") ``` """ self.log.debug("starting the ``yaml`` method") dataCopy = [] dataCopy[:] = [dict(l) for l in self.listOfDictionaries] renderedData = yaml.dump(dataCopy, default_flow_style=False) if filepath and len(self.listOfDictionaries): # RECURSIVELY CREATE MISSING DIRECTORIES if not os.path.exists(os.path.dirname(filepath)): os.makedirs(os.path.dirname(filepath)) stream = open(filepath, "w") yaml.dump(dataCopy, stream, default_flow_style=False) stream.close() self.log.debug("completed the ``yaml`` method") return renderedData
[docs] def mysql(self, tableName, filepath=None, createStatement=None): """*Render the dataset as a series of mysql insert statements* **Key Arguments** - ``tableName`` -- the name of the mysql db table to assign the insert statements to. - ``filepath`` -- path to the file to write the mysql inserts content to. Default *None* createStatement **Return** - ``renderedData`` -- the data rendered mysql insert statements (string format) **Usage** ```python print(dataSet.mysql("testing_table")) ``` this output the following: ```plain INSERT INTO `testing_table` (address,dateCreated,owner,pet) VALUES ("belfast, uk" ,"2016-09-14T16:21:36" ,"daisy" ,"dog") ON DUPLICATE KEY UPDATE address="belfast, uk", dateCreated="2016-09-14T16:21:36", owner="daisy", pet="dog" ; INSERT INTO `testing_table` (address,dateCreated,owner,pet) VALUES ("the moon" ,"2016-09-14T16:21:36" ,"john" ,"snake") ON DUPLICATE KEY UPDATE address="the moon", dateCreated="2016-09-14T16:21:36", owner="john", pet="snake" ; INSERT INTO `testing_table` (address,dateCreated,owner,pet) VALUES ("larne" ,"2016-09-14T16:21:36" ,"susan" ,"crocodile") ON DUPLICATE KEY UPDATE address="larne", dateCreated="2016-09-14T16:21:36", owner="susan", pet="crocodile" ; ``` To save this rendering to file use: ```python dataSet.mysql("testing_table", "/path/to/myfile.sql") ``` """ self.log.debug("starting the ``mysql`` method") import re if ( createStatement and "create table if not exists" not in createStatement.lower() ): regex = re.compile(r"^\s*CREATE TABLE ", re.I | re.S) createStatement = regex.sub("CREATE TABLE IF NOT EXISTS ", createStatement) renderedData = self._list_of_dictionaries_to_mysql_inserts( tableName=tableName, createStatement=createStatement ) if filepath and len(self.listOfDictionaries): # RECURSIVELY CREATE MISSING DIRECTORIES if not os.path.exists(os.path.dirname(filepath)): os.makedirs(os.path.dirname(filepath)) writeFile = open(filepath, mode="w") writeFile.write(renderedData) writeFile.close() self.log.debug("completed the ``mysql`` method") return renderedData
def _list_of_dictionaries_to_csv(self, csvType="human"): """Convert a python list of dictionaries to pretty csv output **Key Arguments** - ``csvType`` -- human, machine or reST **Return** - ``output`` -- the contents of a CSV file """ self.log.debug("starting the ``_list_of_dictionaries_to_csv`` function") import unicodecsv as csv if not len(self.listOfDictionaries): return "NO MATCH" dataCopy = copy.deepcopy(self.listOfDictionaries) tableColumnNames = list(dataCopy[0].keys()) columnWidths = [] columnWidths[:] = [ len(tableColumnNames[i]) for i in range(len(tableColumnNames)) ] output = io.BytesIO() # setup csv styles if csvType == "machine": delimiter = "," elif csvType in ["human", "markdown"]: delimiter = "|" elif csvType in ["reST"]: delimiter = "|" if csvType in ["markdown"]: writer = csv.writer( output, delimiter=delimiter, quoting=csv.QUOTE_NONE, doublequote=False, quotechar='"', escapechar="\\", lineterminator="\n", ) else: writer = csv.writer( output, dialect="excel", delimiter=delimiter, quotechar='"', quoting=csv.QUOTE_MINIMAL, lineterminator="\n", ) if csvType in ["markdown"]: dividerWriter = csv.writer( output, delimiter="|", quoting=csv.QUOTE_NONE, doublequote=False, quotechar='"', escapechar="\\", lineterminator="\n", ) else: dividerWriter = csv.writer( output, dialect="excel", delimiter="+", quotechar='"', quoting=csv.QUOTE_MINIMAL, lineterminator="\n", ) # add column names to csv header = [] divider = [] rstDivider = [] allRows = [] # clean up data for row in dataCopy: for c in tableColumnNames: if isinstance(row[c], float) or isinstance(row[c], Decimal): row[c] = "%0.9g" % row[c] elif isinstance(row[c], datetime): thisDate = str(row[c])[:10] row[c] = "%(thisDate)s" % locals() # set the column widths for row in dataCopy: for i, c in enumerate(tableColumnNames): if len(str(row[c])) > columnWidths[i]: columnWidths[i] = len(str(row[c])) # table borders for human readable if csvType in ["human", "markdown", "reST"]: header.append("") divider.append("") rstDivider.append("") for i, c in enumerate(tableColumnNames): if csvType == "machine": header.append(c) elif csvType in ["human", "markdown", "reST"]: header.append(c.ljust(columnWidths[i] + 2).rjust(columnWidths[i] + 3)) divider.append("-" * (columnWidths[i] + 3)) rstDivider.append("=" * (columnWidths[i] + 3)) # table border for human readable if csvType in ["human", "markdown", "reST"]: header.append("") divider.append("") rstDivider.append("") # fill in the data for row in dataCopy: thisRow = [] # table border for human readable if csvType in ["human", "markdown", "reST"]: thisRow.append("") for i, c in enumerate(tableColumnNames): if csvType in ["human", "markdown", "reST"]: if row[c] == None: row[c] = "" row[c] = str( str(row[c]) .ljust(columnWidths[i] + 2) .rjust(columnWidths[i] + 3) ) thisRow.append(row[c]) # table border for human readable if csvType in ["human", "markdown", "reST"]: thisRow.append("") allRows.append(thisRow) if csvType in ["reST"]: allRows.append(divider) if csvType == "machine": writer.writerow(header) if csvType in ["reST"]: dividerWriter.writerow(divider) writer.writerow(header) dividerWriter.writerow(rstDivider) if csvType in ["human"]: dividerWriter.writerow(divider) writer.writerow(header) dividerWriter.writerow(divider) elif csvType in ["markdown"]: writer.writerow(header) dividerWriter.writerow(divider) # write out the data writer.writerows(allRows) # table border for human readable if csvType in ["human"]: dividerWriter.writerow(divider) output = output.getvalue() output = output.strip() try: output = output.decode("UTF-8") except: output = str(output) if csvType in ["markdown"]: output = output.replace("|--", "|:-") if csvType in ["reST"]: output = output.replace("|--", "+--").replace("--|", "--+") self.log.debug("completed the ``_list_of_dictionaries_to_csv`` function") return output def _list_of_dictionaries_to_mysql_inserts(self, tableName, createStatement=None): """Convert a python list of dictionaries to pretty csv output **Key Arguments** - ``tableName`` -- the name of the table to create the insert statements for - ``createStatement`` -- add this create statement to the top of the file. Will only be executed if no table of that name exists in database. Default *None* **Return** - ``output`` -- the mysql insert statements (as a string) """ self.log.debug( "completed the ````_list_of_dictionaries_to_mysql_inserts`` function" ) if not len(self.listOfDictionaries): return "NO MATCH" dataCopy = copy.deepcopy(self.listOfDictionaries) if createStatement: output = createStatement + "\n" else: output = "" inserts = [] inserts = [] inserts[:] = [ convert_dictionary_to_mysql_table( log=self.log, dictionary=d, dbTableName=tableName, uniqueKeyList=[], dateModified=False, returnInsertOnly=True, replace=True, batchInserts=False, reDatetime=self.reDatetime, ) for d in dataCopy ] output += ";\n".join(inserts) + ";" self.log.debug( "completed the ``_list_of_dictionaries_to_mysql_inserts`` function" ) return output