Examples¶
Preparation¶
Basic preparation steps are required to see examples in action.
Install PyEXASOL with optional dependencies.
Download PyEXASOL source code and unzip it.
Make sure Exasol is installed and dedicated schema for testing is created. You may use free Exasol Community Edition for testing purposes.
Open
/examples/
directory and edit file_config.py
. Input your Exasol credentials.Run script to prepare data set for testing:
python examples/a00_prepare.py
That’s all. Now you may run examples in any order like common python scripts. E.g.:
python examples/a01_basic.py
Examples of core functions¶
Basic¶
"""
Open connection, run simple query, close connection
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Basic connect
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
# Basic query
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())
# Disconnect
C.close()
Fetching Tuples¶
"""
Fetch rows as tuples
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Basic connect (default mapper)
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
# Fetch tuples row-by-row as iterator
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
for row in stmt:
printer.pprint(row)
# Fetch tuples row-by-row with fetchone
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
while True:
row = stmt.fetchone()
if row is None:
break
printer.pprint(row)
# Fetch many
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchmany(3))
printer.pprint(stmt.fetchmany(3))
# Fetch everything in one go
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())
# Fetch one column as list of values
stmt = C.execute("SELECT user_id FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchcol())
# Fetch single value
stmt = C.execute("SELECT count(*) FROM users")
printer.pprint(stmt.fetchval())
Fetching Dictionaries¶
"""
Fetch rows as dictionaries
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Basic connect (default mapper)
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
fetch_dict=True,
)
# Fetch tuples row-by-row as iterator
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
for row in stmt:
printer.pprint(row)
# Fetch tuples row-by-row with fetchone
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
while True:
row = stmt.fetchone()
if row is None:
break
printer.pprint(row)
# Fetch many
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchmany(3))
printer.pprint(stmt.fetchmany(3))
# Fetch everything in one go
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())
# Fetch one column as list of values (same as tuples example!)
stmt = C.execute("SELECT user_id FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchcol())
# Fetch single value (same as tuples example!)
stmt = C.execute("SELECT count(*) FROM users")
printer.pprint(stmt.fetchval())
Custom Data Type Mapper¶
"""
Use custom mapper to get python objects out of Exasol fetch methods
DECIMAL(p,0) -> int
DECIMAL(p,s) -> decimal.Decimal
DOUBLE -> float
DATE -> datetime.date
TIMESTAMP -> datetime.datetime
BOOLEAN -> bool
VARCHAR -> str
CHAR -> str
INTERVAL DAY TO SECOND -> datetime.timedelta (ExaTimeDelta)
<others> -> str
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=180)
# Basic connect (custom mapper
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
fetch_mapper=pyexasol.exasol_mapper,
)
# Fetch objects
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())
# Test timestamp formats with different amount of decimal places
# Please note: Exasol stores timestamps with millisecond precision (3 decimal places)
# Lack of precision is not a bug, it's the documented feature
for i in range(0, 9):
C.execute(f"ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF{i}'")
printer.pprint(C.execute("SELECT TIMESTAMP'2018-01-01 03:04:05.123456'").fetchval())
# Test interval mapping
stmt = C.execute(
"SELECT id, from_ts, to_ts, expected_timedelta, cast(to_ts - from_ts as varchar(100)) as expected_interval, to_ts - from_ts AS ts_diff FROM interval_test"
)
for row in stmt.fetchall():
print(f"-------- Interval Test #{row[0]} --------")
print(f" FROM: {row[1]}")
print(f" TO: {row[2]}")
print(f"EXPECTED TIMEDELTA: {row[3]}")
actual_timedelta = printer.pformat(row[5])
if actual_timedelta == row[3]:
print(f" \033[1;32mACTUAL TIMEDELTA: {actual_timedelta} ✓\033[0m")
else:
print(f" \033[1;31mACTUAL TIMEDELTA: {actual_timedelta} ✗\033[0m")
print(f" EXPECTED INTERVAL: {row[4]}")
actual_interval = row[5].to_interval()
if actual_interval == row[4]:
print(f" \033[1;32mACTUAL INTERVAL: {actual_interval} ✓\033[0m")
else:
print(f" \033[1;31mACTUAL INTERVAL: {actual_interval} ✗\033[0m")
SQL Formatting¶
"""
Format values and identifiers using query_params and pyexasol formatter
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=40)
# Basic connect
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
# SQL with formatting
params = {
"random_value": "abc",
"null_value": None,
"table_name_1": "users",
"table_name_2": (config.schema, "PAYMENTS"),
"user_rating": "0.5",
"user_score": 1e1,
"is_female": "TRUE",
"user_statuses": ["ACTIVE", "PASSIVE", "SUSPENDED"],
"exclude_user_score": [10, 20],
"limit": 10,
}
query = """
SELECT {random_value} AS random_value, {null_value} AS null_value, u.user_id, sum(gross_amt) AS gross_amt
FROM {table_name_1!i} u
JOIN {table_name_2!q} p ON (u.user_id=p.user_id)
WHERE u.user_rating >= {user_rating!d}
AND u.user_score > {user_score!f}
AND u.is_female IS {is_female!r}
AND u.status IN ({user_statuses})
AND u.user_rating NOT IN ({exclude_user_score!d})
GROUP BY 1,2,3
ORDER BY 4 DESC
LIMIT {limit!d}
"""
stmt = C.execute(query, params)
print(stmt.query)
printer.pprint(stmt.fetchall())
Transaction Management¶
"""
Transactions
"""
import _config as config
import pyexasol
# Connect with autocommit OFF
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
autocommit=False,
)
# Another way to change autocommit after connection
C.set_autocommit(False)
stmt = C.execute("TRUNCATE TABLE users")
print(f"Truncate affected {stmt.rowcount()} rows")
C.rollback()
print("Truncate was rolled back")
stmt = C.execute("SELECT count(*) FROM users")
print(f"Select affected {stmt.fetchval()} rows")
C.commit()
print("Select was committed")
Error Handling¶
"""
Exceptions for basic queries
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Bad dsn
try:
C = pyexasol.connect(
dsn="123" + config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
)
except pyexasol.ExaConnectionError as e:
print(e)
# Bad user \ password
try:
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password="123" + config.password,
schema=config.schema,
)
except pyexasol.ExaAuthError as e:
print(e)
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
fetch_size_bytes=1024 * 10,
)
# Invalid SQL
try:
stmt = C.execute(
"""
SELECT1 *
FROM users
ORDER BY user_id
LIMIT 5
"""
)
except pyexasol.ExaQueryError as e:
print(e)
# Valid SQL, but error during execution
try:
stmt = C.execute(
"""
SELECT *
FROM users
WHERE user_name = 10
ORDER BY user_id
LIMIT 5
"""
)
except pyexasol.ExaQueryError as e:
print(e)
# Attempt to read from closed cursor
stmt = C.execute("SELECT * FROM payments")
stmt.fetchone()
stmt.close()
try:
stmt.fetchall()
except pyexasol.ExaRequestError as e:
print(e)
# Attempt to fetch query without result set
stmt = C.execute("COMMIT")
try:
stmt.fetchone()
except pyexasol.ExaRuntimeError as e:
print(e)
# Attempt to run SELECT with duplicate column names
try:
stmt = C.execute(
"""
SELECT 1, 1, 2 AS user_id, 3 AS user_id
FROM dual
"""
)
except pyexasol.ExaRuntimeError as e:
print(e)
# Attempt to run query on closed connection
C.close()
try:
C.execute("SELECT 1")
except pyexasol.ExaRuntimeError as e:
print(e)
# Simulate websocket error during close
C1 = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
C2 = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
C2.execute(f"KILL SESSION {C1.session_id()}")
try:
C1.close()
except pyexasol.ExaError as e:
print(e)
Extension Functions¶
"""
Extension functions
Metadata functions are deprecated in favor of new "meta"-functions
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Basic connect
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
lower_ident=True,
)
cols = C.ext.get_columns("users")
printer.pprint(cols)
cols = C.ext.get_columns_sql("SELECT * FROM users")
printer.pprint(cols)
cols = C.ext.get_sys_columns("users")
printer.pprint(cols)
tables = C.ext.get_sys_tables()
printer.pprint(tables)
views = C.ext.get_sys_views()
printer.pprint(views)
schemas = C.ext.get_sys_schemas()
printer.pprint(schemas)
reserved_words = C.ext.get_reserved_words()
printer.pprint(reserved_words[0:5])
occupied_space = C.ext.get_disk_space_usage()
printer.pprint(occupied_space)
pd = C.ext.export_to_pandas_with_dtype("users")
pd.info()
pd = C.ext.export_to_pandas_with_dtype("payments")
pd.info()
Abort Query¶
"""
Abort long running query from another thread
"""
import pprint
import threading
import time
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
class QueryThread(threading.Thread):
def __init__(self, connection):
self.connection = connection
super().__init__()
def run(self):
try:
# Run heavy query
self.connection.execute(
"SELECT * FROM users a, users b, users c, payments d"
)
except pyexasol.ExaQueryAbortError as e:
print(e.message)
# Basic connect
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
# Start query thread
query_thread = QueryThread(C)
query_thread.start()
# Abort query after 1 second
time.sleep(1)
C.abort_query()
query_thread.join()
Context Manager¶
"""
Use context manager ("with" statement) for ExaConnection and ExaStatement objects
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Basic usage
with pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
) as C:
with C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5") as stmt:
printer.pprint(stmt.fetchall())
printer.pprint(stmt.is_closed)
printer.pprint(C.is_closed)
# Exception causes connection and statement to be closed
try:
with pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
) as C:
with C.execute("SELECT * FROM unknown_table LIMIT 5") as stmt:
printer.pprint(stmt.fetchall())
except pyexasol.ExaQueryError as e:
print(e)
printer.pprint(stmt.is_closed)
printer.pprint(C.is_closed)
Insert Multiple Rows¶
"""
INSERT multiple rows using prepared statements
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
# Insert from list
C.execute("TRUNCATE TABLE users_copy")
all_users = C.execute("SELECT * FROM users").fetchall()
st = C.ext.insert_multi("users_copy", all_users)
print(f"INSERTED {st.rowcount()} rows in {st.execution_time}s")
# Insert from generator with shuffled order of columns
C.execute("TRUNCATE TABLE users_copy")
def users_generator():
for i in range(10000):
yield (i, "abcabc", "2019-01-01 01:02:03", "2019-02-01", "PENDING", False)
st = C.ext.insert_multi(
"users_copy",
users_generator(),
columns=[
"user_id",
"user_name",
"last_visit_ts",
"register_dt",
"status",
"is_female",
],
)
print(f"INSERTED {st.rowcount()} rows in {st.execution_time}s")
# Attempt to insert empty data leads to failure, unlike .import_from_iterable()
try:
C.ext.insert_multi("users_copy", [])
except pyexasol.ExaRuntimeError as e:
print(e)
Metadata Requests¶
"""
Lock-free meta data requests
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
# Get columns without executing SQL query
columns = C.meta.sql_columns("SELECT a.*, a.user_id + 1 AS next_user_id FROM users a")
printer.pprint(columns)
# Schema exists
val = C.meta.schema_exists(C.current_schema())
print(f"Schema exists: {val}")
# Schema does not exist
val = C.meta.schema_exists("abcabcabc")
print(f"Schema exists: {val}")
# Table exists
val = C.meta.table_exists("users")
print(f"Table exists: {val}")
# Table exists (with schema name)
val = C.meta.table_exists((C.current_schema(), "users"))
print(f"Table exists: {val}")
# Table does not exist
val = C.meta.table_exists("abcabcabc")
print(f"Table exists: {val}")
# View exists
val = C.meta.view_exists("users_view")
print(f"View exists: {val}")
# View exists (with schema name)
val = C.meta.view_exists((C.current_schema(), "users_view"))
print(f"View exists: {val}")
# View does not exist
val = C.meta.view_exists("abcabcabc")
print(f"View exists: {val}")
# List schemas
val = C.meta.list_schemas()
printer.pprint(val)
# List schemas with filters
val = C.meta.list_schemas(schema_name_pattern="PYEXASOL%")
printer.pprint(val)
# List tables with filters
val = C.meta.list_tables(table_schema_pattern="PYEXASOL%", table_name_pattern="USERS%")
printer.pprint(val)
# List views with filters
val = C.meta.list_views(view_schema_pattern="PYEXASOL%", view_name_pattern="USERS%")
printer.pprint(val)
# List columns with filters
val = C.meta.list_columns(
column_schema_pattern="PYEXASOL%",
column_table_pattern="USERS%",
column_object_type_pattern="TABLE",
column_name_pattern="%_ID%",
)
printer.pprint(val)
# List objects with filters
val = C.meta.list_objects(object_name_pattern="USERS%", object_type_pattern="TABLE")
printer.pprint(val)
# List object sizes with filters
val = C.meta.list_object_sizes(
object_name_pattern="USERS%", object_type_pattern="TABLE"
)
printer.pprint(val)
# List indices
val = C.meta.list_indices(index_schema_pattern="PYEXASOL%")
printer.pprint(val)
# List keywords
val = C.meta.list_sql_keywords()
printer.pprint(val)
No-SQL Metadata¶
"""
No SQL lock-free meta data requests introduced in Exasol 7.0, WebSocket protocol v2
"""
import pprint
import sys
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
if C.protocol_version() < pyexasol.PROTOCOL_V2:
print("Actual protocol version is less than 2, skipping meta_nosql checks")
sys.exit(0)
# Schema exists
val = C.meta.schema_exists(C.current_schema())
print(f"Schema exists: {val}")
# Schema does not exist
val = C.meta.schema_exists("abcabcabc")
print(f"Schema exists: {val}")
# Table exists
val = C.meta.table_exists("users")
print(f"Table exists: {val}")
# Table exists (with schema name)
val = C.meta.table_exists((C.current_schema(), "users"))
print(f"Table exists: {val}")
# Table does not exist
val = C.meta.table_exists("abcabcabc")
print(f"Table exists: {val}")
# View exists
val = C.meta.view_exists("users_view")
print(f"View exists: {val}")
# View exists (with schema name)
val = C.meta.view_exists((C.current_schema(), "users_view"))
print(f"View exists: {val}")
# View does not exist
val = C.meta.view_exists("abcabcabc")
print(f"View exists: {val}")
# List schemas
st = C.meta.execute_meta_nosql("getSchemas")
printer.pprint(st.fetchall())
# List schemas with filters
st = C.meta.execute_meta_nosql("getSchemas", {"schema": "PYEXASOL%"})
printer.pprint(st.fetchall())
# List tables with filters
st = C.meta.execute_meta_nosql(
"getTables",
{
"schema": "PYEXASOL%",
"table": "USERS%",
"tableTypes": ["TABLE"],
},
)
printer.pprint(st.fetchall())
# List views with filters
st = C.meta.execute_meta_nosql(
"getTables",
{
"schema": "PYEXASOL%",
"table": "USERS%",
"tableTypes": ["VIEW"],
},
)
printer.pprint(st.fetchall())
# List columns with filters
st = C.meta.execute_meta_nosql(
"getColumns",
{
"schema": "PYEXASOL%",
"table": "USERS%",
"column": "%_ID%",
},
)
printer.pprint(st.fetchall())
# List keywords
val = C.meta.list_sql_keywords()
printer.pprint(val[0:10])
# Exception handling
try:
st = C.meta.execute_meta_nosql(
"getColumns",
{
"schema": "PYEXASOL%",
"table": "USERS%",
"column": ["%_ID%"],
},
)
except pyexasol.ExaRequestError as e:
print(e)
Examples of HTTP transport¶
Pandas DataFrame¶
"""
HTTP Transport
EXPORT and IMPORT from Exasol to Pandas DataFrames
Make sure to enable compression for Wifi connections to improve performance
"""
import _config as config
import pyexasol
# Connect with compression enabled
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
compression=True,
)
C.execute("TRUNCATE TABLE users_copy")
# Export from Exasol table into pandas.DataFrame
pd = C.export_to_pandas("users")
pd.info()
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
# Import from pandas DataFrame into Exasol table
C.import_from_pandas(pd, "users_copy")
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
# Export from SQL query
pd = C.export_to_pandas("SELECT user_id, user_name FROM users WHERE user_id >= 5000")
pd.info()
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
Other Methods¶
"""
HTTP transport
EXPORT and IMPORT from Exasol to and from files / lists, etc.
"""
import os
import shutil
import tempfile
import _config as config
import pyexasol
# Connect with compression enabled
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
compression=True,
)
# Prepare empty tables
C.execute("TRUNCATE TABLE users_copy")
C.execute("TRUNCATE TABLE users_copy_reordered")
C.execute("TRUNCATE TABLE payments_copy")
# Create temporary file
file = tempfile.TemporaryFile()
# Export to temporary file
C.export_to_file(
file,
"users",
export_params={
"with_column_names": True,
"comment": "Exporting users to temp file",
},
)
file.seek(0)
print(file.readline())
print(file.readline())
print(file.readline())
file.seek(0)
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
# Import from temporary file
C.import_from_file(file, "users_copy", import_params={"skip": 1})
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
file.close()
# Export to list
users = C.export_to_list("users")
print(users[0])
print(users[1])
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
# Import from list (or any other iterable)
C.import_from_iterable(users, "users_copy")
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
# Export to custom callback
def my_export_callback(pipe, dst):
lines = list()
# Save 2 first lines
lines.append(pipe.readline())
lines.append(pipe.readline())
# Dump everything else to /dev/null
dev_null = open(os.devnull, "wb")
shutil.copyfileobj(pipe, dev_null)
dev_null.close()
return lines
res = C.export_to_callback(my_export_callback, None, "users")
print(res)
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
# Import from custom callback
def my_import_callback(pipe, src):
for line in src:
pipe.write(line)
C.import_from_callback(my_import_callback, res, "users_copy")
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
# Export as gzipped file
file = tempfile.TemporaryFile()
C.export_to_file(
file, "users", export_params={"with_column_names": True, "format": "gz"}
)
file.seek(0)
print(file.read(100))
file.seek(0)
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
# Import gzipped file
C.import_from_file(file, "users_copy", import_params={"skip": 1, "format": "gz"})
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
file.close()
# Custom encoding for IMPORT and EXPORT
file = tempfile.TemporaryFile()
C.export_to_file(file, "users", export_params={"encoding": "WINDOWS-1251"})
file.seek(0)
print(file.readline())
file.seek(0)
# Import file with custom encoding
C.import_from_file(file, "users_copy", import_params={"encoding": "WINDOWS-1251"})
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
file.close()
# Custom columns list for EXPORT
file = tempfile.TemporaryFile()
C.export_to_file(
file,
"users",
export_params={"columns": ["register_dt", "user_id", "status", "user_name"]},
)
file.seek(0)
print(file.readline())
file.seek(0)
# Custom columns list for IMPORT
C.import_from_file(
file,
"users_copy_reordered",
import_params={"columns": ["register_dt", "user_id", "status", "user_name"]},
)
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
file.close()
# Custom CSV cols formatting
file = tempfile.TemporaryFile()
C.export_to_file(
file,
"users",
export_params={
"csv_cols": [
"1",
"2",
"3 FORMAT='DD-MM-YYYY'",
"4..6",
"7 FORMAT='999.99999'",
"8",
]
},
)
file.seek(0)
print(file.readline())
file.seek(0)
C.import_from_file(
file,
"users_copy",
import_params={
"csv_cols": [
"1",
"2",
"3 FORMAT='DD-MM-YYYY'",
"4..6",
"7 FORMAT='999.99999'",
"8",
]
},
)
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
Parallel Export¶
"""
Parallel HTTP transport
EXPORT into multiple independent processes running in parallel
"""
import multiprocessing
import pprint
import _config as config
import pyexasol
import pyexasol.callback as cb
printer = pprint.PrettyPrinter(indent=4, width=140)
class ExportProc(multiprocessing.Process):
def __init__(self, node):
self.node = node
self.read_pipe, self.write_pipe = multiprocessing.Pipe(False)
super().__init__()
def start(self):
super().start()
self.write_pipe.close()
@property
def exa_address(self):
return self.read_pipe.recv()
def run(self):
self.read_pipe.close()
# Init HTTP transport connection
http = pyexasol.http_transport(self.node["ipaddr"], self.node["port"])
# Send internal Exasol address to parent process
self.write_pipe.send(http.exa_address)
self.write_pipe.close()
# Read data from HTTP transport to DataFrame
pd = http.export_to_callback(cb.export_to_pandas, None)
print(f"Child process {self.node['idx']} finished, exported rows: {len(pd)}")
if __name__ == "__main__":
pool_size = 5
pool = []
exa_address_list = []
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
for n in C.get_nodes(pool_size):
proc = ExportProc(n)
proc.start()
pool.append(proc)
exa_address_list.append(proc.exa_address)
printer.pprint(pool)
printer.pprint(exa_address_list)
try:
C.export_parallel(
exa_address_list,
"SELECT * FROM payments",
export_params={"with_column_names": True},
)
except (Exception, KeyboardInterrupt):
for p in pool:
p.terminate()
else:
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
finally:
for p in pool:
p.join()
Parallel Import¶
"""
Parallel HTTP transport
IMPORT from multiple independent processes running in parallel
"""
import multiprocessing
import pprint
import _config as config
import pandas
import pyexasol
import pyexasol.callback as cb
printer = pprint.PrettyPrinter(indent=4, width=140)
class ImportProc(multiprocessing.Process):
def __init__(self, node):
self.node = node
self.read_pipe, self.write_pipe = multiprocessing.Pipe(False)
super().__init__()
def start(self):
super().start()
self.write_pipe.close()
@property
def exa_address(self):
return self.read_pipe.recv()
def run(self):
self.read_pipe.close()
# Init HTTP transport connection
http = pyexasol.http_transport(self.node["ipaddr"], self.node["port"])
# Send internal Exasol address to parent process
self.write_pipe.send(http.exa_address)
self.write_pipe.close()
data = [
{"user_id": 1, "user_name": "John", "shard_id": self.node["idx"]},
{"user_id": 2, "user_name": "Foo", "shard_id": self.node["idx"]},
{"user_id": 3, "user_name": "Bar", "shard_id": self.node["idx"]},
]
pd = pandas.DataFrame(data, columns=["user_id", "user_name", "shard_id"])
# Send data from DataFrame to HTTP transport
http.import_from_callback(cb.import_from_pandas, pd)
print(f"Child process {self.node['idx']} finished, imported rows: {len(pd)}")
if __name__ == "__main__":
pool_size = 5
pool = []
exa_address_list = []
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
C.execute("TRUNCATE TABLE parallel_import")
for n in C.get_nodes(pool_size):
proc = ImportProc(n)
proc.start()
pool.append(proc)
exa_address_list.append(proc.exa_address)
printer.pprint(pool)
printer.pprint(exa_address_list)
try:
C.import_parallel(exa_address_list, "parallel_import")
except (Exception, KeyboardInterrupt):
for p in pool:
p.terminate()
else:
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
finally:
for p in pool:
p.join()
Parallel Export/Import¶
"""
Parallel HTTP transport
EXPORT data, process it and IMPORT back to Exasol
Do it in multiple independent processes running in parallel
Compression and encryption are enabled in this example
"""
import multiprocessing
import pprint
import _config as config
import pyexasol
import pyexasol.callback as cb
printer = pprint.PrettyPrinter(indent=4, width=140)
class ExportProc(multiprocessing.Process):
def __init__(self, node):
self.node = node
self.read_pipe, self.write_pipe = multiprocessing.Pipe(False)
super().__init__()
def start(self):
super().start()
self.write_pipe.close()
@property
def exa_address_pair(self):
return self.read_pipe.recv()
def run(self):
self.read_pipe.close()
# Init separate HTTP transport connections for EXPORT and IMPORT
http_export = pyexasol.http_transport(
self.node["ipaddr"], self.node["port"], compression=True, encryption=True
)
http_import = pyexasol.http_transport(
self.node["ipaddr"], self.node["port"], compression=True, encryption=True
)
# Send pairs of internal Exasol address to parent process
self.write_pipe.send([http_export.exa_address, http_import.exa_address])
self.write_pipe.close()
# Read data from HTTP transport to DataFrame
pd = http_export.export_to_callback(cb.export_to_pandas, None)
print(
f"EXPORT child process {self.node['idx']} finished, exported rows:{len(pd)}"
)
# Modify data set
pd["GROSS_AMT"] = pd["GROSS_AMT"] + 1
# Write data back to HTTP transport
http_import.import_from_callback(cb.import_from_pandas, pd)
print(
f"IMPORT child process {self.node['idx']} finished, imported rows:{len(pd)}"
)
if __name__ == "__main__":
pool_size = 8
pool = []
exa_address_export = []
exa_address_import = []
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
compression=True,
encryption=True,
)
C.execute("TRUNCATE TABLE payments_copy")
for i in C.get_nodes(pool_size):
proc = ExportProc(i)
proc.start()
pool.append(proc)
pair = proc.exa_address_pair
exa_address_export.append(pair[0])
exa_address_import.append(pair[1])
printer.pprint(pool)
printer.pprint(exa_address_export)
printer.pprint(exa_address_import)
try:
C.export_parallel(
exa_address_export,
"SELECT * FROM payments",
export_params={"with_column_names": True},
)
except (Exception, KeyboardInterrupt):
for p in pool:
p.terminate()
p.join()
else:
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
try:
C.import_parallel(exa_address_import, "payments_copy")
except (Exception, KeyboardInterrupt):
for p in pool:
p.terminate()
p.join()
else:
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
for p in pool:
p.join()
HTTP Transport Errors¶
"""
Parallel HTTP transport
Edge cases, killing & failing various components at different times
"""
import os
import pprint
import shutil
import threading
import time
import traceback
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
dev_null = open(os.devnull, "wb")
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
debug=False,
)
###
# Normal execution
###
def observer_callback(pipe, dst, **kwargs):
print(f"Threads running: {threading.active_count()}")
shutil.copyfileobj(pipe, dev_null)
return
C.export_to_callback(observer_callback, None, "SELECT * FROM users LIMIT 1000")
print("--- Finished Observer callback (normal execution) ---\n")
###
# SQL error
###
try:
C.export_to_callback(observer_callback, None, "SELECT * FROM usersaaa LIMIT 1000")
except Exception as e:
traceback.print_exc()
print("--- Finished Observer callback (SQL error) ---\n")
###
# Abort SQL query
###
def abort_query_callback(pipe, dst, **kwargs):
C.abort_query()
time.sleep(2)
shutil.copyfileobj(pipe, dev_null)
return
try:
C.export_to_callback(abort_query_callback, None, "SELECT * FROM users LIMIT 1000")
except pyexasol.ExaError as e:
traceback.print_exc()
print("--- Finished Abort Query ---\n")
###
# Error from callback EXPORT
###
def runtime_error_callback(pipe, dst, **kwargs):
pipe.read(10)
time.sleep(1)
raise RuntimeError("Test error!")
try:
C.export_to_callback(runtime_error_callback, None, "SELECT * FROM users LIMIT 1000")
except Exception as e:
traceback.print_exc()
print("--- Finished Runtime Error EXPORT Callback ---\n")
###
# Error from callback IMPORT
###
def runtime_error_callback(pipe, src, **kwargs):
pipe.write(b"a,b,c,d")
time.sleep(1)
raise RuntimeError("Test error!")
try:
C.import_from_callback(runtime_error_callback, None, "users_copy")
except Exception as e:
traceback.print_exc()
print("--- Finished Runtime Error IMPORT Callback ---\n")
###
# Close WS connection
###
def close_connection_callback(pipe, dst, **kwargs):
C.close(disconnect=False)
time.sleep(1)
shutil.copyfileobj(pipe, dev_null)
return
try:
C.export_to_callback(
close_connection_callback, None, "SELECT * FROM users LIMIT 1000"
)
except Exception as e:
traceback.print_exc()
print("--- Finished Close Connection ---\n")
Examples of misc functions¶
Connection Redundancy¶
"""
Connection redundancy, attempts to connect to all hosts from DSN in random order
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Basic connect
C = pyexasol.connect(
dsn="0.42.42.40..49," + config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
connection_timeout=2,
debug=True,
)
Edge Cases¶
"""
Try to read and write minimum and maximum possible values, test various edge cases
"""
import decimal
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Basic connect
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
edge_cases = [
# Biggest values
{
"dec36_0": decimal.Decimal("+" + ("9" * 36)),
"dec36_36": decimal.Decimal("+0." + ("9" * 36)),
"dbl": 1.7e308,
"bl": True,
"dt": "9999-12-31",
"ts": "9999-12-31 23:59:59.999",
"var100": "ひ" * 100,
"var2000000": "ひ" * 2000000,
},
# Smallest values
{
"dec36_0": decimal.Decimal("-" + ("9" * 36)),
"dec36_36": decimal.Decimal("-0." + ("9" * 36)),
"dbl": -1.7e308,
"bl": False,
"dt": "0001-01-01",
"ts": "0001-01-01 00:00:00",
"var100": "",
"var2000000": "ひ",
},
# All nulls
{
"dec36_0": None,
"dec36_36": None,
"dbl": None,
"bl": None,
"dt": None,
"ts": None,
"var100": None,
"var2000000": None,
},
]
insert_q = "INSERT INTO edge_case VALUES ({dec36_0!d}, {dec36_36!d}, {dbl!f}, {bl}, {dt}, {ts}, {var100}, {var2000000})"
select_q = "SELECT dec36_0, dec36_36, dbl, bl, dt, ts, var100, LENGTH(var2000000) AS len_var FROM edge_case"
C.execute("TRUNCATE TABLE edge_case")
# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))
# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())
# Same actions with "exasol_mapper"
C.options["fetch_mapper"] = pyexasol.exasol_mapper
C.execute("TRUNCATE TABLE edge_case")
# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))
# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())
# Import and export
edge_tuples = C.execute("SELECT * FROM edge_case").fetchall()
C.execute("TRUNCATE TABLE edge_case")
C.import_from_iterable(edge_tuples, "edge_case")
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
res = C.export_to_list(select_q)
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
printer.pprint(res)
# resultSetHandle and data in the same response
stmt = C.execute("SELECT a.* FROM edge_case a, edge_case b, edge_case c, edge_case d")
print(f"Rows total: {stmt.num_rows_total}, rows chunk: {stmt.num_rows_chunk}")
print(f"Rows actually returned {sum(1 for _ in stmt)}")
# Very large query
stmt = C.execute(
"SELECT {val1} AS val1, {val2} AS val2, {val3} AS val3, {val4} AS val4, {val5} AS val5",
{
"val1": edge_cases[0]["var2000000"],
"val2": edge_cases[0]["var2000000"],
"val3": edge_cases[0]["var2000000"],
"val4": edge_cases[0]["var2000000"],
"val5": edge_cases[0]["var2000000"],
},
)
print(f"Query length: {len(stmt.query)}")
print(f"Result column length: {len(stmt.fetchone()[0])}")
# Very large query with compression
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
compression=True,
encryption=True,
)
stmt = C.execute(
"SELECT {val1} AS val1, {val2} AS val2, {val3} AS val3, {val4} AS val4, {val5} AS val5",
{
"val1": edge_cases[0]["var2000000"],
"val2": edge_cases[0]["var2000000"],
"val3": edge_cases[0]["var2000000"],
"val4": edge_cases[0]["var2000000"],
"val5": edge_cases[0]["var2000000"],
},
)
print(f"Query length: {len(stmt.query)}")
print(f"Result column length: {len(stmt.fetchone()[0])}")
DB-API 2.0 Compatibility¶
"""
Basic compatibility with DB-API 2.0
Suitable for temporary testing only, should not be used in production
"""
import pprint
import _config as config
import pyexasol.db2
printer = pprint.PrettyPrinter(indent=4, width=140)
C = pyexasol.db2.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
cur = C.cursor()
# Fetch tuples row-by-row as iterator
cur.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
while True:
row = cur.fetchone()
if row is None:
break
printer.pprint(row)
# Fetch many
cur.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(cur.fetchmany(3))
printer.pprint(cur.fetchmany(3))
# Fetch everything in one go
cur.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(cur.fetchall())
printer.pprint(cur.description)
printer.pprint(cur.rowcount)
# Autocommit is False by default
print(C.attr["autocommit"])
C.commit()
SSL Encryption¶
"""
Connection with SSL encryption enabled
It works both for WebSocket communication (wss://) and HTTP(S) Transport
"""
import hashlib
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Connect with encryption
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
encryption=True,
)
# Basic query
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())
# Export to list
users = C.export_to_list("SELECT * FROM users ORDER BY user_id LIMIT 5")
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
print(users[0])
print(users[1])
# Import from list
C.import_from_iterable(users, "users_copy")
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
# Connect with encryption AND certificate fingerprint check
server_fingerprint = hashlib.sha256(C._ws.sock.getpeercert(True)).hexdigest().upper()
print(f"Server certificate fingerprint: {server_fingerprint}")
if ":" in config.dsn:
dsn_with_valid_fingerprint = config.dsn.replace(":", f"/{server_fingerprint}:")
dsn_with_invalid_fingerprint = config.dsn.replace(":", f"/123abc:")
else:
dsn_with_valid_fingerprint = f"{config.dsn}/{server_fingerprint}"
dsn_with_invalid_fingerprint = f"{config.dsn}/123abc"
C = pyexasol.connect(
dsn=dsn_with_valid_fingerprint,
user=config.user,
password=config.password,
schema=config.schema,
encryption=True,
)
print(f"Encrypted connection with fingerprint validation was established")
# Invalid fingerprint causes exception
try:
pyexasol.connect(
dsn=dsn_with_invalid_fingerprint,
user=config.user,
password=config.password,
schema=config.schema,
encryption=True,
)
except pyexasol.ExaConnectionFailedError as e:
print(e)
# Valid fingerprint without encryption causes exception
try:
pyexasol.connect(
dsn=dsn_with_valid_fingerprint,
user=config.user,
password=config.password,
schema=config.schema,
)
except pyexasol.ExaConnectionDsnError as e:
print(e)
Custom Session Parameters¶
"""
Custom client name, client version, other session parameters
Useful for logging
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Normal session
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
fetch_dict=True,
)
st = C.execute("SELECT * FROM EXA_DBA_SESSIONS WHERE session_id=CURRENT_SESSION")
printer.pprint(st.fetchall())
C.close()
# Modified session
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
fetch_dict=True,
client_name="MyCustomClient",
client_version="1.2.3",
client_os_username="small_cat",
)
st = C.execute("SELECT * FROM EXA_DBA_SESSIONS WHERE session_id=CURRENT_SESSION")
printer.pprint(st.fetchall())
C.close()
Local Config File¶
"""
Local config file
"""
import configparser
import pathlib
import pprint
import tempfile
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Generate tmp file with sample config
with tempfile.TemporaryDirectory() as tempdir:
tempdir = pathlib.Path(tempdir)
handle = open(tempdir / "test.ini", "w+", encoding="utf-8")
parser = configparser.ConfigParser()
parser["test1"] = {
"dsn": config.dsn,
"user": config.user,
"password": config.password,
"schema": config.schema,
"compression": True,
"encryption": False,
"socket_timeout": 20,
}
parser.write(handle)
handle.seek(0)
print(handle.read())
handle.close()
# Open connection using config file
C = pyexasol.connect_local_config("test1", config_path=handle.name)
# Basic query
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())
Profiling¶
"""
Profiling with and without details
"""
import json
import pprint
import sys
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
C.execute("ALTER SESSION SET QUERY_CACHE = 'OFF'")
C.execute("ALTER SESSION SET PROFILE = 'ON'")
# Normal profiling
stmt = C.execute(
"""
SELECT u.user_id, sum(p.gross_amt) AS total_gross_amt
FROM users u
LEFT JOIN payments p ON (u.user_id=p.user_id)
GROUP BY 1
ORDER BY 2 DESC NULLS LAST
LIMIT 10
"""
)
printer.pprint(stmt.fetchall())
json.dump(C.ext.explain_last(), sys.stdout, indent=4)
# Profiling with extra details per node (IPROC column)
C.execute(
"""
SELECT u.user_id, sum(p.gross_amt) AS total_gross_amt
FROM users u
LEFT JOIN payments p ON (u.user_id=p.user_id)
GROUP BY 1
ORDER BY 2 DESC NULLS LAST
LIMIT 10
"""
)
json.dump(C.ext.explain_last(details=True), sys.stdout, indent=4)
Snapshot Transactions¶
"""
Snapshot transactions
Explanations about locks: https://exasol.my.site.com/s/article/WAIT-FOR-COMMIT-on-SELECT-statement?language=en_US
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# First connection, read first table, update second table
C1 = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
C1.set_autocommit(False)
C1.execute("SELECT * FROM TAB1")
C1.execute("INSERT INTO TAB2 VALUES (1)")
# Second connection, update first table
C2 = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
C2.set_autocommit(False)
C2.execute("INSERT INTO TAB1 VALUES(1)")
C2.commit()
# Third connection, read second table
C3 = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
snapshot_transactions=True,
debug=True,
)
# Exasol locks on this query without snapshot transactions
stmt = C3.execute(
"SELECT column_name, column_type FROM EXA_ALL_COLUMNS WHERE column_schema=CURRENT_SCHEMA AND column_table='TAB2'"
)
printer.pprint(stmt.fetchall())
UDF Script Output¶
"""
Script output server
Exasol should be able to open connection to the host where current script is running
May not work on laptops due to firewall and network security restrictions
"""
import os
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
is_travis = "TRAVIS" in os.environ
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
query_timeout=5,
# Custom parameters are required to make this example work in Travis context
# 172.17.0.1 is an IP address of docker host in Linux
udf_output_bind_address=("", 8580) if is_travis else None,
udf_output_connect_address=("172.17.0.1", 8580) if is_travis else None,
)
# Normal script output from multiple VM's
stmt, log_files = C.execute_udf_output(
"""
SELECT echo_java(user_id)
FROM users
GROUP BY CEIL(RANDOM() * 4)
"""
)
printer.pprint(stmt.fetchall())
printer.pprint(log_files)
print(log_files[0].read_text())
# No rows selected, no VM's started by Exasol, no log files created
# execute_udf_output should not hang in such case
stmt, log_files = C.execute_udf_output(
"""
WITH cte_users AS (
SELECT user_id
FROM users
ORDER BY null
LIMIT 0
)
SELECT echo_java(user_id)
FROM cte_users
GROUP BY CEIL(RANDOM() * 4)
"""
)
printer.pprint(stmt.fetchall())
printer.pprint(log_files)
print(f"SELECTED {stmt.rowcount()} rows in {stmt.execution_time}s")
Class Extension¶
"""
Extend core PyEXASOL classes, add custom logic
In this example we add print_session_id() custom method to all objects
"""
import collections
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
class CustomExaStatement(pyexasol.ExaStatement):
def print_session_id(self):
print(f"Statement session_id: {self.connection.session_id()}")
class CustomExaFormatter(pyexasol.ExaFormatter):
def print_session_id(self):
print(f"Formatter session_id: {self.connection.session_id()}")
class CustomExaLogger(pyexasol.ExaLogger):
def print_session_id(self):
print(f"Logger session_id: {self.connection.session_id()}")
class CustomExaExtension(pyexasol.ExaExtension):
def print_session_id(self):
print(f"Extension session_id: {self.connection.session_id()}")
class CustomExaMetaData(pyexasol.ExaMetaData):
def print_session_id(self):
print(f"MetaData session_id: {self.connection.session_id()}")
class CustomExaConnection(pyexasol.ExaConnection):
# Set custom sub-classes here
cls_statement = CustomExaStatement
cls_formatter = CustomExaFormatter
cls_logger = CustomExaLogger
cls_extension = CustomExaExtension
cls_meta = CustomExaMetaData
def __init__(self, **kwargs):
if "custom_param" in kwargs:
print(f"Custom connection parameter: {kwargs['custom_param']}")
del kwargs["custom_param"]
super().__init__(**kwargs)
C = CustomExaConnection(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
custom_param="test custom param!",
)
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
# Access overloaded objects
stmt.print_session_id()
C.format.print_session_id()
C.logger.print_session_id()
C.ext.print_session_id()
C.meta.print_session_id()
C.close()
print("Return result rows as named tuples")
class NamedTupleExaStatement(pyexasol.ExaStatement):
def __next__(self):
row = super().__next__()
return self.cls_row(*row)
def _init_result_set(self, res):
super()._init_result_set(res)
self.cls_row = collections.namedtuple("Row", self.col_names)
class NamedTupleExaConnection(pyexasol.ExaConnection):
cls_statement = NamedTupleExaStatement
C = NamedTupleExaConnection(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
print(stmt.fetchone())
print(stmt.fetchone())
C.close()
Quoted Identifiers¶
"""
Major cases when 'quote_ident' connection option takes effect
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Basic connect
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, quote_ident=True
)
# Open schema
C.open_schema(config.schema)
# Export from table name with lower case characters
pd = C.export_to_pandas("camelCaseTable")
pd.info()
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
# Export into table name with lower case characters
C.import_from_pandas(pd, "camelCaseTable")
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
# Ext
cols = C.ext.get_columns("camelCaseTable")
printer.pprint(cols)
cols = C.ext.get_columns_sql(
"SELECT * FROM {table_name!q}", {"table_name": "camelCaseTable"}
)
printer.pprint(cols)
cols = C.ext.get_columns((config.schema, "camelCaseTable"))
printer.pprint(cols)
cols = C.ext.get_sys_columns("camelCaseTable")
printer.pprint(cols)
Thread Safety¶
"""
Attempt to access connection object from multiple threads simultaneously
"""
import pprint
import threading
import time
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
class QueryThread(threading.Thread):
def __init__(self, connection):
self.connection = connection
super().__init__()
def run(self):
# Run heavy query
try:
self.connection.execute(
"SELECT * FROM users a, users b, users c, payments d"
)
except pyexasol.ExaQueryAbortError as e:
print(e.message)
except pyexasol.ExaConcurrencyError as e:
print(e.message)
# Basic connect
C = pyexasol.connect(
dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
# Try to run multiple query threads in parallel
query_thread_1 = QueryThread(C)
query_thread_2 = QueryThread(C)
query_thread_1.start()
query_thread_2.start()
time.sleep(1)
C.abort_query()
query_thread_1.join()
query_thread_2.join()
DSN Parsing¶
"""
Demonstration of DSN (Connection string) expansion and related exceptions
"""
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
verbose_error=False,
encryption=True,
)
print("IP range with custom port: ")
result = C._process_dsn("127.0.0.10..19:8564")
printer.pprint(sorted(result))
print("Multiple ranges with multiple ports and with default port at the end: ")
result = C._process_dsn("127.0.0.10..19:8564,127.0.0.20,localhost:8565,127.0.0.21..23")
printer.pprint(sorted(result))
print("Multiple ranges with fingerprint and port: ")
result = C._process_dsn("127.0.0.10..19/ABC,127.0.0.20,localhost/CDE:8564")
printer.pprint(sorted(result))
# Empty DSN
try:
result = C._process_dsn(" ")
printer.pprint(result)
except pyexasol.ExaConnectionDsnError as e:
print(e)
# Invalid range
try:
result = C._process_dsn("127.0.0.15..10")
printer.pprint(result)
except pyexasol.ExaConnectionDsnError as e:
print(e)
# Cannot resolve hostname
try:
result = C._process_dsn("test1..5.zlan")
printer.pprint(result)
except pyexasol.ExaConnectionDsnError as e:
print(e)
# Hostname range with zero-padding
try:
result = C._process_dsn("test01..20.zlan")
printer.pprint(result)
except pyexasol.ExaConnectionDsnError as e:
print(e)
HTTP Proxy¶
"""
Open connection with HTTP proxy
"""
import pprint
import subprocess
import time
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Simple HTTP proxy
pproxy = subprocess.Popen(["pproxy", "-l", "http://:8562/", "--reuse"])
time.sleep(1)
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
http_proxy="http://localhost:8562",
)
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())
C.close()
pproxy.terminate()
# HTTP proxy with auth
pproxy = subprocess.Popen(
["pproxy", "-l", "http://:8562/#my_user:secret_pass", "--reuse"]
)
time.sleep(1)
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
http_proxy="http://my_user:secret_pass@localhost:8562",
)
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())
C.close()
pproxy.terminate()
Garbage Collection¶
"""
Ensure ExaConnection and ExaStatement objects are garbage collected properly
"""
import gc
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
class ExaStatementVerbose(pyexasol.ExaStatement):
def __del__(self):
print(
f"ExaStatement session_id={self.connection.session_id()}, stmt_idx={self.stmt_idx} is about to be collected"
)
super().__del__()
class ExaConnectionVerbose(pyexasol.ExaConnection):
cls_statement = ExaStatementVerbose
def __del__(self):
print(f"ExaConnection {self.session_id()} is about to be collected")
super().__del__()
def run_test():
C = ExaConnectionVerbose(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
debug=False,
fetch_size_bytes=1024,
)
# Execute statement, read some data
st = C.execute("SELECT * FROM users")
st.fetchmany(5)
# Execute another statement, no more references for the first statement
st = C.execute("SELECT * FROM payments")
st.fetchmany(5)
del st
del C
print("Collect call 1 start")
gc.collect()
print("Collect call 1 finish")
C = ExaConnectionVerbose(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
debug=False,
fetch_size_bytes=1024,
)
st = C.execute("SELECT * FROM users")
st.fetchmany(5)
run_test()
print("Collect call 2 start")
gc.collect()
print("Collect call 2 finish")
print("Finishing script, nothing should be collected beyond this point!")
Examples of JSON libraries used for fetching¶
RapidJSON¶
"""
JSON library "rapidjson"
"""
import decimal
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Basic connect
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
json_lib="rapidjson",
)
edge_cases = [
# Biggest values
{
"dec36_0": decimal.Decimal("+" + ("9" * 36)),
"dec36_36": decimal.Decimal("+0." + ("9" * 36)),
"dbl": 1.7e308,
"bl": True,
"dt": "9999-12-31",
"ts": "9999-12-31 23:59:59.999",
"var100": "ひ" * 100,
"var2000000": "ひ" * 2000000,
},
# Smallest values
{
"dec36_0": decimal.Decimal("-" + ("9" * 36)),
"dec36_36": decimal.Decimal("-0." + ("9" * 36)),
"dbl": -1.7e308,
"bl": False,
"dt": "0001-01-01",
"ts": "0001-01-01 00:00:00",
"var100": "",
"var2000000": "ひ",
},
# All nulls
{
"dec36_0": None,
"dec36_36": None,
"dbl": None,
"bl": None,
"dt": None,
"ts": None,
"var100": None,
"var2000000": None,
},
]
insert_q = "INSERT INTO edge_case VALUES ({dec36_0!d}, {dec36_36!d}, {dbl!f}, {bl}, {dt}, {ts}, {var100}, {var2000000})"
select_q = "SELECT dec36_0, dec36_36, dbl, bl, dt, ts, var100, LENGTH(var2000000) AS len_var FROM edge_case"
C.execute("TRUNCATE TABLE edge_case")
# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))
# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())
# Same actions with "exasol_mapper"
C.options["fetch_mapper"] = pyexasol.exasol_mapper
C.execute("TRUNCATE TABLE edge_case")
# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))
# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())
# Import and export
edge_tuples = C.execute("SELECT * FROM edge_case").fetchall()
UJSON¶
"""
JSON library "ujson"
"""
import decimal
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Basic connect
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
json_lib="ujson",
)
edge_cases = [
# Biggest values
{
"dec36_0": decimal.Decimal("+" + ("9" * 36)),
"dec36_36": decimal.Decimal("+0." + ("9" * 36)),
"dbl": 1.7e308,
"bl": True,
"dt": "9999-12-31",
"ts": "9999-12-31 23:59:59.999",
"var100": "ひ" * 100,
"var2000000": "ひ" * 2000000,
},
# Smallest values
{
"dec36_0": decimal.Decimal("-" + ("9" * 36)),
"dec36_36": decimal.Decimal("-0." + ("9" * 36)),
"dbl": -1.7e308,
"bl": False,
"dt": "0001-01-01",
"ts": "0001-01-01 00:00:00",
"var100": "",
"var2000000": "ひ",
},
# All nulls
{
"dec36_0": None,
"dec36_36": None,
"dbl": None,
"bl": None,
"dt": None,
"ts": None,
"var100": None,
"var2000000": None,
},
]
insert_q = "INSERT INTO edge_case VALUES ({dec36_0!d}, {dec36_36!d}, {dbl!f}, {bl}, {dt}, {ts}, {var100}, {var2000000})"
select_q = "SELECT dec36_0, dec36_36, dbl, bl, dt, ts, var100, LENGTH(var2000000) AS len_var FROM edge_case"
C.execute("TRUNCATE TABLE edge_case")
# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))
# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())
# Same actions with "exasol_mapper"
C.options["fetch_mapper"] = pyexasol.exasol_mapper
C.execute("TRUNCATE TABLE edge_case")
# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))
# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())
# Import and export
edge_tuples = C.execute("SELECT * FROM edge_case").fetchall()
ORJSON¶
"""
JSON library "orjson"
"""
import decimal
import pprint
import _config as config
import pyexasol
printer = pprint.PrettyPrinter(indent=4, width=140)
# Basic connect
C = pyexasol.connect(
dsn=config.dsn,
user=config.user,
password=config.password,
schema=config.schema,
json_lib="orjson",
)
edge_cases = [
# Biggest values
{
"dec36_0": decimal.Decimal("+" + ("9" * 36)),
"dec36_36": decimal.Decimal("+0." + ("9" * 36)),
"dbl": 1.7e308,
"bl": True,
"dt": "9999-12-31",
"ts": "9999-12-31 23:59:59.999",
"var100": "ひ" * 100,
"var2000000": "ひ" * 2000000,
},
# Smallest values
{
"dec36_0": decimal.Decimal("-" + ("9" * 36)),
"dec36_36": decimal.Decimal("-0." + ("9" * 36)),
"dbl": -1.7e308,
"bl": False,
"dt": "0001-01-01",
"ts": "0001-01-01 00:00:00",
"var100": "",
"var2000000": "ひ",
},
# All nulls
{
"dec36_0": None,
"dec36_36": None,
"dbl": None,
"bl": None,
"dt": None,
"ts": None,
"var100": None,
"var2000000": None,
},
]
insert_q = "INSERT INTO edge_case VALUES ({dec36_0!d}, {dec36_36!d}, {dbl!f}, {bl}, {dt}, {ts}, {var100}, {var2000000})"
select_q = "SELECT dec36_0, dec36_36, dbl, bl, dt, ts, var100, LENGTH(var2000000) AS len_var FROM edge_case"
C.execute("TRUNCATE TABLE edge_case")
# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))
# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())
# Same actions with "exasol_mapper"
C.options["fetch_mapper"] = pyexasol.exasol_mapper
C.execute("TRUNCATE TABLE edge_case")
# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))
# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())
# Import and export
edge_tuples = C.execute("SELECT * FROM edge_case").fetchall()