Source code for bearsql.bearsql

"""Main module."""
from bearsql import logger
from duckdb import connect
from pandas import DataFrame
from typing import Optional, Union, Generator
import string
import random


[docs]class SqlContext: def __init__(self, table: Optional[str] = None, view: Optional[str] = None, database: Optional[str] = None): ''' Class to create sql context for pandas dataframes so as to add sql syntax. :param table: table name to be used. The value can be changed later :type table: Optional[str]; default None :param view: view name to be used. This value can be changed later :type view: Optional[str]; default None :param database: name of the database. A file based db will be created to store all the tables. Default is :memory:. The default database will persist only for a session. Post the session, it will be deleted. To persist the database, we can instantiate this class by passing the database name :type database: str; default None ''' self.db = database self._table = table self._view = view self.__conn = self.__engine() @property def table(self): ''' Table property of the class. This will give the name of the table that is currently in use ''' return self._table @table.setter def table(self, table: str): ''' Method to set new table name ''' if not table: raise Exception('Expected table name, got None') logger.info(f'Setting table name to {table}') self._table = table @property def view(self): ''' View property of the class. This will give the name of the view that is currently in use ''' return self._view @view.setter def view(self, view: str): ''' Method to set new view name ''' if not view: raise Exception('Expected view name, got None') logger.info(f'Setting view name to {view}') self._view = view def __engine(self): ''' Private method to connect to duckdb returns: duckdb connection object :rtype: duckdb ''' if not self.db: self.db = ':memory:' logger.info(f'Database created at {self.db}') return connect(self.db, read_only=False)
[docs] def register_table(self, df: DataFrame, table: Optional[str] = None) -> None: ''' This method creates a table in the database with pandas dataframe as the input. To create a table, a view must be created. If there is no view name specified in this class, a new random view name will be generated :param df: pandas input dataframe :type df: DataFrame :param table: table name; the dataframe will sit in the database and can be referenced using this table name :type table: Optional[str]; default None ''' view = self.view if self.view else ''.join( random.choice(string.ascii_lowercase) for i in range(10)) self.view = view if table: self.table = table logger.info(f'Creating table {self.table}') self.register_view(df, view) self.__conn.execute( f'CREATE TABLE {self.table} as SELECT * FROM {view}') logger.info(f'Table {self.table} created')
[docs] def register_view(self, df: DataFrame, view: Optional[str] = None) -> None: ''' This method creates a view in the database with pandas dataframe as the input. If there is no view name is passed and not specified in this class, an exception will be thrown :param df: pandas input dataframe :type df: DataFrame :param view: table name; the dataframe will sit in the database and can be referenced using this table name :type view: Optional[str]; default None ''' view = view if view else self.view if not view: raise Exception('No view name found! Please pass view name') self.view = view self.__conn.register(self.view, df) logger.info(f'View {self.view} created')
[docs] def sql(self, query: Union[str, list], output: str = 'df') -> Generator: ''' Method to run sql queries on pandas dataframe. :param query: sql query to execute on pandas dataframe. It can be one single query or a list of multiple queries :type query: Union[str, list] :param output: Output format of the query results. This can either be df, arrow or any :type output: str; default df returns: Generator object containing all the query results :rtype: Generator ''' logger.info(f'Starting query execution') if isinstance(query, str): query = [query] for sql in query: logger.info(f'[EXECUTING] {sql}') self.__conn.execute(sql) if output == 'df': logger.info(f'Fetching output in dataframe format') yield self.__conn.fetch_df() if output == 'arrow': yield self.__conn.fetch_arrow_table() else: yield self.__conn.fetchall()
[docs] def relation(self, df: DataFrame, table: Optional[str] = None): ''' Create a relational table on top of pandas dataframe. If tagged with a table name, this name can be used to run sql queries. :param df: pandas input dataframe :type df: DataFrame :param table: name of the table :type table: Optional[str]; default None returns: duckdb relation :rtype: duckdb ''' logger.info(f'Creating relation from dataframe') rel = self.__conn.from_df(df) table = table if table else f'table_{random.randint(10,50)}' self.table = table rel.set_alias(self.table) logger.info( f'Table can now be referenced using alias {self.table}') return rel
[docs] def close(self): ''' Method to close database connection ''' logger.info('Closing connection') self.__conn.close() logger.info('Connection closed')