Examples

Preparation

Basic preparation steps are required to see examples in action.

  1. Install PyEXASOL with optional dependencies.

  2. Download PyEXASOL source code and unzip it.

  3. Make sure Exasol is installed and dedicated schema for testing is created. You may use free Exasol Community Edition for testing purposes.

  4. Open /examples/ directory and edit file _config.py. Input your Exasol credentials.

  5. Run script to prepare data set for testing:

    python examples/a00_prepare.py
    

That’s all. Now you may run examples in any order like common python scripts. E.g.:

python examples/a01_basic.py

Examples of core functions

Basic

minimal code to create connection and run query
"""
Open connection, run simple query, close connection
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Basic connect
C = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)

# Basic query
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())

# Disconnect
C.close()

Fetching Tuples

all methods of fetching result set returning tuples
"""
Fetch rows as tuples
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Basic connect (default mapper)
C = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)

# Fetch tuples row-by-row as iterator
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")

for row in stmt:
    printer.pprint(row)

# Fetch tuples row-by-row with fetchone
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")

while True:
    row = stmt.fetchone()

    if row is None:
        break

    printer.pprint(row)

# Fetch many
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchmany(3))
printer.pprint(stmt.fetchmany(3))

# Fetch everything in one go
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())

# Fetch one column as list of values
stmt = C.execute("SELECT user_id FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchcol())

# Fetch single value
stmt = C.execute("SELECT count(*) FROM users")
printer.pprint(stmt.fetchval())

Fetching Dictionaries

all methods of fetching result set returning dictionaries
"""
Fetch rows as dictionaries
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Basic connect (default mapper)
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    fetch_dict=True,
)

# Fetch tuples row-by-row as iterator
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")

for row in stmt:
    printer.pprint(row)


# Fetch tuples row-by-row with fetchone
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")

while True:
    row = stmt.fetchone()

    if row is None:
        break

    printer.pprint(row)

# Fetch many
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchmany(3))
printer.pprint(stmt.fetchmany(3))

# Fetch everything in one go
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())

# Fetch one column as list of values (same as tuples example!)
stmt = C.execute("SELECT user_id FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchcol())

# Fetch single value (same as tuples example!)
stmt = C.execute("SELECT count(*) FROM users")
printer.pprint(stmt.fetchval())

Custom Data Type Mapper

apply custom data type mapper for fetching
"""
Use custom mapper to get python objects out of Exasol fetch methods

DECIMAL(p,0)           -> int
DECIMAL(p,s)           -> decimal.Decimal
DOUBLE                 -> float
DATE                   -> datetime.date
TIMESTAMP              -> datetime.datetime
BOOLEAN                -> bool
VARCHAR                -> str
CHAR                   -> str
INTERVAL DAY TO SECOND -> datetime.timedelta (ExaTimeDelta)
<others>               -> str
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=180)

# Basic connect (custom mapper
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    fetch_mapper=pyexasol.exasol_mapper,
)

# Fetch objects
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())

# Test timestamp formats with different amount of decimal places
# Please note: Exasol stores timestamps with millisecond precision (3 decimal places)
# Lack of precision is not a bug, it's the documented feature

for i in range(0, 9):
    C.execute(f"ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS.FF{i}'")
    printer.pprint(C.execute("SELECT TIMESTAMP'2018-01-01 03:04:05.123456'").fetchval())

# Test interval mapping
stmt = C.execute(
    "SELECT id, from_ts, to_ts, expected_timedelta, cast(to_ts - from_ts as varchar(100)) as expected_interval, to_ts - from_ts AS ts_diff FROM interval_test"
)
for row in stmt.fetchall():
    print(f"-------- Interval Test #{row[0]} --------")
    print(f"              FROM: {row[1]}")
    print(f"                TO: {row[2]}")

    print(f"EXPECTED TIMEDELTA: {row[3]}")
    actual_timedelta = printer.pformat(row[5])
    if actual_timedelta == row[3]:
        print(f"  \033[1;32mACTUAL TIMEDELTA: {actual_timedelta}\033[0m")
    else:
        print(f"  \033[1;31mACTUAL TIMEDELTA: {actual_timedelta}\033[0m")

    print(f" EXPECTED INTERVAL: {row[4]}")
    actual_interval = row[5].to_interval()
    if actual_interval == row[4]:
        print(f"   \033[1;32mACTUAL INTERVAL: {actual_interval}\033[0m")
    else:
        print(f"   \033[1;31mACTUAL INTERVAL: {actual_interval}\033[0m")

SQL Formatting

SQL text formatting
"""
Format values and identifiers using query_params and pyexasol formatter
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=40)

# Basic connect
C = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)

# SQL with formatting
params = {
    "random_value": "abc",
    "null_value": None,
    "table_name_1": "users",
    "table_name_2": (config.schema, "PAYMENTS"),
    "user_rating": "0.5",
    "user_score": 1e1,
    "is_female": "TRUE",
    "user_statuses": ["ACTIVE", "PASSIVE", "SUSPENDED"],
    "exclude_user_score": [10, 20],
    "limit": 10,
}

query = """
    SELECT {random_value} AS random_value, {null_value} AS null_value, u.user_id, sum(gross_amt) AS gross_amt
    FROM {table_name_1!i} u
        JOIN {table_name_2!q} p ON (u.user_id=p.user_id)
    WHERE u.user_rating >= {user_rating!d}
        AND u.user_score > {user_score!f}
        AND u.is_female IS {is_female!r}
        AND u.status IN ({user_statuses})
        AND u.user_rating NOT IN ({exclude_user_score!d})
    GROUP BY 1,2,3
    ORDER BY 4 DESC
    LIMIT {limit!d}
"""

stmt = C.execute(query, params)
print(stmt.query)
printer.pprint(stmt.fetchall())

Transaction Management

transaction management, autocommit
"""
Transactions
"""

import _config as config

import pyexasol

# Connect with autocommit OFF
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    autocommit=False,
)

# Another way to change autocommit after connection
C.set_autocommit(False)

stmt = C.execute("TRUNCATE TABLE users")
print(f"Truncate affected {stmt.rowcount()} rows")

C.rollback()
print("Truncate was rolled back")

stmt = C.execute("SELECT count(*) FROM users")
print(f"Select affected {stmt.fetchval()} rows")

C.commit()
print("Select was committed")

Error Handling

error handling for basic SQL queries
"""
Exceptions for basic queries
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Bad dsn
try:
    C = pyexasol.connect(
        dsn="123" + config.dsn,
        user=config.user,
        password=config.password,
        schema=config.schema,
    )
except pyexasol.ExaConnectionError as e:
    print(e)

# Bad user \ password
try:
    C = pyexasol.connect(
        dsn=config.dsn,
        user=config.user,
        password="123" + config.password,
        schema=config.schema,
    )
except pyexasol.ExaAuthError as e:
    print(e)

C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    fetch_size_bytes=1024 * 10,
)

# Invalid SQL
try:
    stmt = C.execute(
        """
        SELECT1 *
        FROM users 
        ORDER BY user_id 
        LIMIT 5
    """
    )
except pyexasol.ExaQueryError as e:
    print(e)


# Valid SQL, but error during execution
try:
    stmt = C.execute(
        """
        SELECT *
        FROM users 
        WHERE user_name = 10
        ORDER BY user_id
        LIMIT 5
    """
    )
except pyexasol.ExaQueryError as e:
    print(e)

# Attempt to read from closed cursor
stmt = C.execute("SELECT * FROM payments")
stmt.fetchone()
stmt.close()

try:
    stmt.fetchall()
except pyexasol.ExaRequestError as e:
    print(e)

# Attempt to fetch query without result set
stmt = C.execute("COMMIT")

try:
    stmt.fetchone()
except pyexasol.ExaRuntimeError as e:
    print(e)

# Attempt to run SELECT with duplicate column names
try:
    stmt = C.execute(
        """
        SELECT 1, 1, 2 AS user_id, 3 AS user_id
        FROM dual
    """
    )
except pyexasol.ExaRuntimeError as e:
    print(e)

# Attempt to run query on closed connection
C.close()

try:
    C.execute("SELECT 1")
except pyexasol.ExaRuntimeError as e:
    print(e)

# Simulate websocket error during close
C1 = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
C2 = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)

C2.execute(f"KILL SESSION {C1.session_id()}")

try:
    C1.close()
except pyexasol.ExaError as e:
    print(e)

Extension Functions

extension functions to help with common problems outside the scope of the database driver
"""
Extension functions
Metadata functions are deprecated in favor of new "meta"-functions
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Basic connect
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    lower_ident=True,
)

cols = C.ext.get_columns("users")
printer.pprint(cols)

cols = C.ext.get_columns_sql("SELECT * FROM users")
printer.pprint(cols)

cols = C.ext.get_sys_columns("users")
printer.pprint(cols)

tables = C.ext.get_sys_tables()
printer.pprint(tables)

views = C.ext.get_sys_views()
printer.pprint(views)

schemas = C.ext.get_sys_schemas()
printer.pprint(schemas)

reserved_words = C.ext.get_reserved_words()
printer.pprint(reserved_words[0:5])

occupied_space = C.ext.get_disk_space_usage()
printer.pprint(occupied_space)

pd = C.ext.export_to_pandas_with_dtype("users")
pd.info()

pd = C.ext.export_to_pandas_with_dtype("payments")
pd.info()

Abort Query

abort running query from a separate thread
"""
Abort long running query from another thread
"""

import pprint
import threading
import time

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)


class QueryThread(threading.Thread):
    def __init__(self, connection):
        self.connection = connection
        super().__init__()

    def run(self):
        try:
            # Run heavy query
            self.connection.execute(
                "SELECT * FROM users a, users b, users c, payments d"
            )
        except pyexasol.ExaQueryAbortError as e:
            print(e.message)


# Basic connect
C = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)

# Start query thread
query_thread = QueryThread(C)
query_thread.start()

# Abort query after 1 second
time.sleep(1)
C.abort_query()

query_thread.join()

Context Manager

use WITH clause for ExaConnection and ExaStatement objects
"""
Use context manager ("with" statement) for ExaConnection and ExaStatement objects
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Basic usage
with pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
) as C:
    with C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5") as stmt:
        printer.pprint(stmt.fetchall())

    printer.pprint(stmt.is_closed)

printer.pprint(C.is_closed)

# Exception causes connection and statement to be closed
try:
    with pyexasol.connect(
        dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
    ) as C:
        with C.execute("SELECT * FROM unknown_table LIMIT 5") as stmt:
            printer.pprint(stmt.fetchall())
except pyexasol.ExaQueryError as e:
    print(e)

printer.pprint(stmt.is_closed)
printer.pprint(C.is_closed)

Insert Multiple Rows

INSERT a small number of rows using prepared statements instead of HTTP transport
"""
INSERT multiple rows using prepared statements
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)


C = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)

# Insert from list
C.execute("TRUNCATE TABLE users_copy")

all_users = C.execute("SELECT * FROM users").fetchall()
st = C.ext.insert_multi("users_copy", all_users)

print(f"INSERTED {st.rowcount()} rows in {st.execution_time}s")


# Insert from generator with shuffled order of columns
C.execute("TRUNCATE TABLE users_copy")


def users_generator():
    for i in range(10000):
        yield (i, "abcabc", "2019-01-01 01:02:03", "2019-02-01", "PENDING", False)


st = C.ext.insert_multi(
    "users_copy",
    users_generator(),
    columns=[
        "user_id",
        "user_name",
        "last_visit_ts",
        "register_dt",
        "status",
        "is_female",
    ],
)

print(f"INSERTED {st.rowcount()} rows in {st.execution_time}s")


# Attempt to insert empty data leads to failure, unlike .import_from_iterable()
try:
    C.ext.insert_multi("users_copy", [])
except pyexasol.ExaRuntimeError as e:
    print(e)

Metadata Requests

lock-free meta data requests
"""
Lock-free meta data requests
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)


C = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)

# Get columns without executing SQL query
columns = C.meta.sql_columns("SELECT a.*, a.user_id + 1 AS next_user_id FROM users a")
printer.pprint(columns)


# Schema exists
val = C.meta.schema_exists(C.current_schema())
print(f"Schema exists: {val}")

# Schema does not exist
val = C.meta.schema_exists("abcabcabc")
print(f"Schema exists: {val}")


# Table exists
val = C.meta.table_exists("users")
print(f"Table exists: {val}")

# Table exists (with schema name)
val = C.meta.table_exists((C.current_schema(), "users"))
print(f"Table exists: {val}")

# Table does not exist
val = C.meta.table_exists("abcabcabc")
print(f"Table exists: {val}")


# View exists
val = C.meta.view_exists("users_view")
print(f"View exists: {val}")

# View exists (with schema name)
val = C.meta.view_exists((C.current_schema(), "users_view"))
print(f"View exists: {val}")

# View does not exist
val = C.meta.view_exists("abcabcabc")
print(f"View exists: {val}")


# List schemas
val = C.meta.list_schemas()
printer.pprint(val)

# List schemas with filters
val = C.meta.list_schemas(schema_name_pattern="PYEXASOL%")
printer.pprint(val)


# List tables with filters
val = C.meta.list_tables(table_schema_pattern="PYEXASOL%", table_name_pattern="USERS%")
printer.pprint(val)


# List views with filters
val = C.meta.list_views(view_schema_pattern="PYEXASOL%", view_name_pattern="USERS%")
printer.pprint(val)


# List columns with filters
val = C.meta.list_columns(
    column_schema_pattern="PYEXASOL%",
    column_table_pattern="USERS%",
    column_object_type_pattern="TABLE",
    column_name_pattern="%_ID%",
)
printer.pprint(val)


# List objects with filters
val = C.meta.list_objects(object_name_pattern="USERS%", object_type_pattern="TABLE")
printer.pprint(val)


# List object sizes with filters
val = C.meta.list_object_sizes(
    object_name_pattern="USERS%", object_type_pattern="TABLE"
)
printer.pprint(val)


# List indices
val = C.meta.list_indices(index_schema_pattern="PYEXASOL%")
printer.pprint(val)


# List keywords
val = C.meta.list_sql_keywords()
printer.pprint(val)

No-SQL Metadata

no-SQL metadata commands introduced in Exasol v7.0+
"""
No SQL lock-free meta data requests introduced in Exasol 7.0, WebSocket protocol v2
"""

import pprint
import sys

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)


C = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)

if C.protocol_version() < pyexasol.PROTOCOL_V2:
    print("Actual protocol version is less than 2, skipping meta_nosql checks")
    sys.exit(0)


# Schema exists
val = C.meta.schema_exists(C.current_schema())
print(f"Schema exists: {val}")

# Schema does not exist
val = C.meta.schema_exists("abcabcabc")
print(f"Schema exists: {val}")


# Table exists
val = C.meta.table_exists("users")
print(f"Table exists: {val}")

# Table exists (with schema name)
val = C.meta.table_exists((C.current_schema(), "users"))
print(f"Table exists: {val}")

# Table does not exist
val = C.meta.table_exists("abcabcabc")
print(f"Table exists: {val}")


# View exists
val = C.meta.view_exists("users_view")
print(f"View exists: {val}")

# View exists (with schema name)
val = C.meta.view_exists((C.current_schema(), "users_view"))
print(f"View exists: {val}")

# View does not exist
val = C.meta.view_exists("abcabcabc")
print(f"View exists: {val}")


# List schemas
st = C.meta.execute_meta_nosql("getSchemas")
printer.pprint(st.fetchall())

# List schemas with filters
st = C.meta.execute_meta_nosql("getSchemas", {"schema": "PYEXASOL%"})
printer.pprint(st.fetchall())


# List tables with filters
st = C.meta.execute_meta_nosql(
    "getTables",
    {
        "schema": "PYEXASOL%",
        "table": "USERS%",
        "tableTypes": ["TABLE"],
    },
)
printer.pprint(st.fetchall())


# List views with filters
st = C.meta.execute_meta_nosql(
    "getTables",
    {
        "schema": "PYEXASOL%",
        "table": "USERS%",
        "tableTypes": ["VIEW"],
    },
)
printer.pprint(st.fetchall())


# List columns with filters
st = C.meta.execute_meta_nosql(
    "getColumns",
    {
        "schema": "PYEXASOL%",
        "table": "USERS%",
        "column": "%_ID%",
    },
)
printer.pprint(st.fetchall())


# List keywords
val = C.meta.list_sql_keywords()
printer.pprint(val[0:10])


# Exception handling
try:
    st = C.meta.execute_meta_nosql(
        "getColumns",
        {
            "schema": "PYEXASOL%",
            "table": "USERS%",
            "column": ["%_ID%"],
        },
    )
except pyexasol.ExaRequestError as e:
    print(e)

Examples of HTTP transport

Pandas DataFrame

IMPORT / EXPORT to and from pandas.DataFrame
"""
HTTP Transport

EXPORT and IMPORT from Exasol to Pandas DataFrames
Make sure to enable compression for Wifi connections to improve performance
"""

import _config as config

import pyexasol

# Connect with compression enabled
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    compression=True,
)

C.execute("TRUNCATE TABLE users_copy")

# Export from Exasol table into pandas.DataFrame
pd = C.export_to_pandas("users")
pd.info()

stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

# Import from pandas DataFrame into Exasol table
C.import_from_pandas(pd, "users_copy")

stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

# Export from SQL query
pd = C.export_to_pandas("SELECT user_id, user_name FROM users WHERE user_id >= 5000")
pd.info()

stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

Other Methods

other methods of IMPORT / EXPORT
"""
HTTP transport

EXPORT and IMPORT from Exasol to and from files / lists, etc.
"""

import os
import shutil
import tempfile

import _config as config

import pyexasol

# Connect with compression enabled
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    compression=True,
)

# Prepare empty tables
C.execute("TRUNCATE TABLE users_copy")
C.execute("TRUNCATE TABLE users_copy_reordered")
C.execute("TRUNCATE TABLE payments_copy")

# Create temporary file
file = tempfile.TemporaryFile()

# Export to temporary file
C.export_to_file(
    file,
    "users",
    export_params={
        "with_column_names": True,
        "comment": "Exporting users to temp file",
    },
)

file.seek(0)
print(file.readline())
print(file.readline())
print(file.readline())
file.seek(0)

stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

# Import from temporary file
C.import_from_file(file, "users_copy", import_params={"skip": 1})

stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

file.close()

# Export to list
users = C.export_to_list("users")
print(users[0])
print(users[1])

stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

# Import from list (or any other iterable)
C.import_from_iterable(users, "users_copy")

stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")


# Export to custom callback
def my_export_callback(pipe, dst):
    lines = list()

    # Save 2 first lines
    lines.append(pipe.readline())
    lines.append(pipe.readline())

    # Dump everything else to /dev/null
    dev_null = open(os.devnull, "wb")
    shutil.copyfileobj(pipe, dev_null)
    dev_null.close()

    return lines


res = C.export_to_callback(my_export_callback, None, "users")
print(res)

stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")


# Import from custom callback
def my_import_callback(pipe, src):
    for line in src:
        pipe.write(line)


C.import_from_callback(my_import_callback, res, "users_copy")
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

# Export as gzipped file
file = tempfile.TemporaryFile()

C.export_to_file(
    file, "users", export_params={"with_column_names": True, "format": "gz"}
)

file.seek(0)
print(file.read(100))
file.seek(0)

stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

# Import gzipped file
C.import_from_file(file, "users_copy", import_params={"skip": 1, "format": "gz"})

stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

file.close()


# Custom encoding for IMPORT and EXPORT
file = tempfile.TemporaryFile()

C.export_to_file(file, "users", export_params={"encoding": "WINDOWS-1251"})

file.seek(0)
print(file.readline())
file.seek(0)

# Import file with custom encoding
C.import_from_file(file, "users_copy", import_params={"encoding": "WINDOWS-1251"})

stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

file.close()

# Custom columns list for EXPORT
file = tempfile.TemporaryFile()

C.export_to_file(
    file,
    "users",
    export_params={"columns": ["register_dt", "user_id", "status", "user_name"]},
)

file.seek(0)
print(file.readline())
file.seek(0)

# Custom columns list for IMPORT
C.import_from_file(
    file,
    "users_copy_reordered",
    import_params={"columns": ["register_dt", "user_id", "status", "user_name"]},
)

stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

file.close()

# Custom CSV cols formatting
file = tempfile.TemporaryFile()

C.export_to_file(
    file,
    "users",
    export_params={
        "csv_cols": [
            "1",
            "2",
            "3 FORMAT='DD-MM-YYYY'",
            "4..6",
            "7 FORMAT='999.99999'",
            "8",
        ]
    },
)

file.seek(0)
print(file.readline())
file.seek(0)

C.import_from_file(
    file,
    "users_copy",
    import_params={
        "csv_cols": [
            "1",
            "2",
            "3 FORMAT='DD-MM-YYYY'",
            "4..6",
            "7 FORMAT='999.99999'",
            "8",
        ]
    },
)

stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

Parallel Export

multi-process HTTP transport for EXPORT
"""
Parallel HTTP transport

EXPORT into multiple independent processes running in parallel
"""

import multiprocessing
import pprint

import _config as config

import pyexasol
import pyexasol.callback as cb

printer = pprint.PrettyPrinter(indent=4, width=140)


class ExportProc(multiprocessing.Process):
    def __init__(self, node):
        self.node = node
        self.read_pipe, self.write_pipe = multiprocessing.Pipe(False)

        super().__init__()

    def start(self):
        super().start()
        self.write_pipe.close()

    @property
    def exa_address(self):
        return self.read_pipe.recv()

    def run(self):
        self.read_pipe.close()

        # Init HTTP transport connection
        http = pyexasol.http_transport(self.node["ipaddr"], self.node["port"])

        # Send internal Exasol address to parent process
        self.write_pipe.send(http.exa_address)
        self.write_pipe.close()

        # Read data from HTTP transport to DataFrame
        pd = http.export_to_callback(cb.export_to_pandas, None)
        print(f"Child process {self.node['idx']} finished, exported rows: {len(pd)}")


if __name__ == "__main__":
    pool_size = 5
    pool = []
    exa_address_list = []

    C = pyexasol.connect(
        dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
    )

    for n in C.get_nodes(pool_size):
        proc = ExportProc(n)
        proc.start()

        pool.append(proc)
        exa_address_list.append(proc.exa_address)

    printer.pprint(pool)
    printer.pprint(exa_address_list)

    try:
        C.export_parallel(
            exa_address_list,
            "SELECT * FROM payments",
            export_params={"with_column_names": True},
        )
    except (Exception, KeyboardInterrupt):
        for p in pool:
            p.terminate()
    else:
        stmt = C.last_statement()
        print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
    finally:
        for p in pool:
            p.join()

Parallel Import

multi-process HTTP transport for IMPORT
"""
Parallel HTTP transport

IMPORT from multiple independent processes running in parallel
"""

import multiprocessing
import pprint

import _config as config
import pandas

import pyexasol
import pyexasol.callback as cb

printer = pprint.PrettyPrinter(indent=4, width=140)


class ImportProc(multiprocessing.Process):
    def __init__(self, node):
        self.node = node
        self.read_pipe, self.write_pipe = multiprocessing.Pipe(False)

        super().__init__()

    def start(self):
        super().start()
        self.write_pipe.close()

    @property
    def exa_address(self):
        return self.read_pipe.recv()

    def run(self):
        self.read_pipe.close()

        # Init HTTP transport connection
        http = pyexasol.http_transport(self.node["ipaddr"], self.node["port"])

        # Send internal Exasol address to parent process
        self.write_pipe.send(http.exa_address)
        self.write_pipe.close()

        data = [
            {"user_id": 1, "user_name": "John", "shard_id": self.node["idx"]},
            {"user_id": 2, "user_name": "Foo", "shard_id": self.node["idx"]},
            {"user_id": 3, "user_name": "Bar", "shard_id": self.node["idx"]},
        ]

        pd = pandas.DataFrame(data, columns=["user_id", "user_name", "shard_id"])

        # Send data from DataFrame to HTTP transport
        http.import_from_callback(cb.import_from_pandas, pd)
        print(f"Child process {self.node['idx']} finished, imported rows: {len(pd)}")


if __name__ == "__main__":
    pool_size = 5
    pool = []
    exa_address_list = []

    C = pyexasol.connect(
        dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
    )

    C.execute("TRUNCATE TABLE parallel_import")

    for n in C.get_nodes(pool_size):
        proc = ImportProc(n)
        proc.start()

        pool.append(proc)
        exa_address_list.append(proc.exa_address)

    printer.pprint(pool)
    printer.pprint(exa_address_list)

    try:
        C.import_parallel(exa_address_list, "parallel_import")
    except (Exception, KeyboardInterrupt):
        for p in pool:
            p.terminate()
    else:
        stmt = C.last_statement()
        print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
    finally:
        for p in pool:
            p.join()

Parallel Export/Import

multi-process HTTP transport for EXPORT followed by IMPORT
"""
Parallel HTTP transport

EXPORT data, process it and IMPORT back to Exasol
Do it in multiple independent processes running in parallel

Compression and encryption are enabled in this example
"""

import multiprocessing
import pprint

import _config as config

import pyexasol
import pyexasol.callback as cb

printer = pprint.PrettyPrinter(indent=4, width=140)


class ExportProc(multiprocessing.Process):
    def __init__(self, node):
        self.node = node
        self.read_pipe, self.write_pipe = multiprocessing.Pipe(False)

        super().__init__()

    def start(self):
        super().start()
        self.write_pipe.close()

    @property
    def exa_address_pair(self):
        return self.read_pipe.recv()

    def run(self):
        self.read_pipe.close()

        # Init separate HTTP transport connections for EXPORT and IMPORT
        http_export = pyexasol.http_transport(
            self.node["ipaddr"], self.node["port"], compression=True, encryption=True
        )
        http_import = pyexasol.http_transport(
            self.node["ipaddr"], self.node["port"], compression=True, encryption=True
        )

        # Send pairs of internal Exasol address to parent process
        self.write_pipe.send([http_export.exa_address, http_import.exa_address])
        self.write_pipe.close()

        # Read data from HTTP transport to DataFrame
        pd = http_export.export_to_callback(cb.export_to_pandas, None)
        print(
            f"EXPORT child process {self.node['idx']} finished, exported rows:{len(pd)}"
        )

        # Modify data set
        pd["GROSS_AMT"] = pd["GROSS_AMT"] + 1

        # Write data back to HTTP transport
        http_import.import_from_callback(cb.import_from_pandas, pd)
        print(
            f"IMPORT child process {self.node['idx']} finished, imported rows:{len(pd)}"
        )


if __name__ == "__main__":
    pool_size = 8
    pool = []
    exa_address_export = []
    exa_address_import = []

    C = pyexasol.connect(
        dsn=config.dsn,
        user=config.user,
        password=config.password,
        schema=config.schema,
        compression=True,
        encryption=True,
    )

    C.execute("TRUNCATE TABLE payments_copy")

    for i in C.get_nodes(pool_size):
        proc = ExportProc(i)
        proc.start()

        pool.append(proc)
        pair = proc.exa_address_pair

        exa_address_export.append(pair[0])
        exa_address_import.append(pair[1])

    printer.pprint(pool)
    printer.pprint(exa_address_export)
    printer.pprint(exa_address_import)

    try:
        C.export_parallel(
            exa_address_export,
            "SELECT * FROM payments",
            export_params={"with_column_names": True},
        )
    except (Exception, KeyboardInterrupt):
        for p in pool:
            p.terminate()
            p.join()
    else:
        stmt = C.last_statement()
        print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

    try:
        C.import_parallel(exa_address_import, "payments_copy")
    except (Exception, KeyboardInterrupt):
        for p in pool:
            p.terminate()
            p.join()
    else:
        stmt = C.last_statement()
        print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

    for p in pool:
        p.join()

HTTP Transport Errors

various ways to break HTTP transport and handle errors
"""
Parallel HTTP transport

Edge cases, killing & failing various components at different times
"""

import os
import pprint
import shutil
import threading
import time
import traceback

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

dev_null = open(os.devnull, "wb")


C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    debug=False,
)


###
# Normal execution
###


def observer_callback(pipe, dst, **kwargs):
    print(f"Threads running: {threading.active_count()}")
    shutil.copyfileobj(pipe, dev_null)

    return


C.export_to_callback(observer_callback, None, "SELECT * FROM users LIMIT 1000")
print("--- Finished Observer callback (normal execution) ---\n")


###
# SQL error
###


try:
    C.export_to_callback(observer_callback, None, "SELECT * FROM usersaaa LIMIT 1000")
except Exception as e:
    traceback.print_exc()

print("--- Finished Observer callback (SQL error) ---\n")


###
# Abort SQL query
###


def abort_query_callback(pipe, dst, **kwargs):
    C.abort_query()
    time.sleep(2)

    shutil.copyfileobj(pipe, dev_null)

    return


try:
    C.export_to_callback(abort_query_callback, None, "SELECT * FROM users LIMIT 1000")
except pyexasol.ExaError as e:
    traceback.print_exc()

print("--- Finished Abort Query ---\n")


###
# Error from callback EXPORT
###


def runtime_error_callback(pipe, dst, **kwargs):
    pipe.read(10)
    time.sleep(1)

    raise RuntimeError("Test error!")


try:
    C.export_to_callback(runtime_error_callback, None, "SELECT * FROM users LIMIT 1000")
except Exception as e:
    traceback.print_exc()

print("--- Finished Runtime Error EXPORT Callback ---\n")


###
# Error from callback IMPORT
###


def runtime_error_callback(pipe, src, **kwargs):
    pipe.write(b"a,b,c,d")
    time.sleep(1)

    raise RuntimeError("Test error!")


try:
    C.import_from_callback(runtime_error_callback, None, "users_copy")
except Exception as e:
    traceback.print_exc()

print("--- Finished Runtime Error IMPORT Callback ---\n")


###
# Close WS connection
###


def close_connection_callback(pipe, dst, **kwargs):
    C.close(disconnect=False)
    time.sleep(1)

    shutil.copyfileobj(pipe, dev_null)

    return


try:
    C.export_to_callback(
        close_connection_callback, None, "SELECT * FROM users LIMIT 1000"
    )
except Exception as e:
    traceback.print_exc()

print("--- Finished Close Connection ---\n")

Examples of misc functions

Connection Redundancy

connection redundancy, handling of missing nodes
"""
Connection redundancy, attempts to connect to all hosts from DSN in random order
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Basic connect
C = pyexasol.connect(
    dsn="0.42.42.40..49," + config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    connection_timeout=2,
    debug=True,
)

Edge Cases

storing and fetching biggest and smallest values for data types available in Exasol
"""
Try to read and write minimum and maximum possible values, test various edge cases
"""

import decimal
import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Basic connect
C = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)

edge_cases = [
    # Biggest values
    {
        "dec36_0": decimal.Decimal("+" + ("9" * 36)),
        "dec36_36": decimal.Decimal("+0." + ("9" * 36)),
        "dbl": 1.7e308,
        "bl": True,
        "dt": "9999-12-31",
        "ts": "9999-12-31 23:59:59.999",
        "var100": "ひ" * 100,
        "var2000000": "ひ" * 2000000,
    },
    # Smallest values
    {
        "dec36_0": decimal.Decimal("-" + ("9" * 36)),
        "dec36_36": decimal.Decimal("-0." + ("9" * 36)),
        "dbl": -1.7e308,
        "bl": False,
        "dt": "0001-01-01",
        "ts": "0001-01-01 00:00:00",
        "var100": "",
        "var2000000": "ひ",
    },
    # All nulls
    {
        "dec36_0": None,
        "dec36_36": None,
        "dbl": None,
        "bl": None,
        "dt": None,
        "ts": None,
        "var100": None,
        "var2000000": None,
    },
]

insert_q = "INSERT INTO edge_case VALUES ({dec36_0!d}, {dec36_36!d}, {dbl!f}, {bl}, {dt}, {ts}, {var100}, {var2000000})"
select_q = "SELECT dec36_0, dec36_36, dbl, bl, dt, ts, var100, LENGTH(var2000000) AS len_var FROM edge_case"

C.execute("TRUNCATE TABLE edge_case")

# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))

# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())


# Same actions with "exasol_mapper"
C.options["fetch_mapper"] = pyexasol.exasol_mapper
C.execute("TRUNCATE TABLE edge_case")

# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))

# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())

# Import and export
edge_tuples = C.execute("SELECT * FROM edge_case").fetchall()

C.execute("TRUNCATE TABLE edge_case")

C.import_from_iterable(edge_tuples, "edge_case")
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

res = C.export_to_list(select_q)
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")
printer.pprint(res)

# resultSetHandle and data in the same response
stmt = C.execute("SELECT a.* FROM edge_case a, edge_case b, edge_case c, edge_case d")
print(f"Rows total: {stmt.num_rows_total}, rows chunk: {stmt.num_rows_chunk}")
print(f"Rows actually returned {sum(1 for _ in stmt)}")


# Very large query
stmt = C.execute(
    "SELECT {val1} AS val1, {val2} AS val2, {val3} AS val3, {val4} AS val4, {val5} AS val5",
    {
        "val1": edge_cases[0]["var2000000"],
        "val2": edge_cases[0]["var2000000"],
        "val3": edge_cases[0]["var2000000"],
        "val4": edge_cases[0]["var2000000"],
        "val5": edge_cases[0]["var2000000"],
    },
)

print(f"Query length: {len(stmt.query)}")
print(f"Result column length: {len(stmt.fetchone()[0])}")

# Very large query with compression
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    compression=True,
    encryption=True,
)

stmt = C.execute(
    "SELECT {val1} AS val1, {val2} AS val2, {val3} AS val3, {val4} AS val4, {val5} AS val5",
    {
        "val1": edge_cases[0]["var2000000"],
        "val2": edge_cases[0]["var2000000"],
        "val3": edge_cases[0]["var2000000"],
        "val4": edge_cases[0]["var2000000"],
        "val5": edge_cases[0]["var2000000"],
    },
)

print(f"Query length: {len(stmt.query)}")
print(f"Result column length: {len(stmt.fetchone()[0])}")

DB-API 2.0 Compatibility

DB-API 2.0 compatibility wrapper
"""
Basic compatibility with DB-API 2.0
Suitable for temporary testing only, should not be used in production
"""

import pprint

import _config as config

import pyexasol.db2

printer = pprint.PrettyPrinter(indent=4, width=140)

C = pyexasol.db2.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
cur = C.cursor()

# Fetch tuples row-by-row as iterator
cur.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")

while True:
    row = cur.fetchone()

    if row is None:
        break

    printer.pprint(row)

# Fetch many
cur.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(cur.fetchmany(3))
printer.pprint(cur.fetchmany(3))

# Fetch everything in one go
cur.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(cur.fetchall())

printer.pprint(cur.description)
printer.pprint(cur.rowcount)

# Autocommit is False by default
print(C.attr["autocommit"])
C.commit()

SSL Encryption

SSL-encrypted WebSocket connection and HTTP transport
"""
Connection with SSL encryption enabled
It works both for WebSocket communication (wss://) and HTTP(S) Transport
"""

import hashlib
import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Connect with encryption
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    encryption=True,
)

# Basic query
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())

# Export to list
users = C.export_to_list("SELECT * FROM users ORDER BY user_id LIMIT 5")
stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

print(users[0])
print(users[1])

# Import from list
C.import_from_iterable(users, "users_copy")
stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")


# Connect with encryption AND certificate fingerprint check
server_fingerprint = hashlib.sha256(C._ws.sock.getpeercert(True)).hexdigest().upper()
print(f"Server certificate fingerprint: {server_fingerprint}")

if ":" in config.dsn:
    dsn_with_valid_fingerprint = config.dsn.replace(":", f"/{server_fingerprint}:")
    dsn_with_invalid_fingerprint = config.dsn.replace(":", f"/123abc:")
else:
    dsn_with_valid_fingerprint = f"{config.dsn}/{server_fingerprint}"
    dsn_with_invalid_fingerprint = f"{config.dsn}/123abc"

C = pyexasol.connect(
    dsn=dsn_with_valid_fingerprint,
    user=config.user,
    password=config.password,
    schema=config.schema,
    encryption=True,
)
print(f"Encrypted connection with fingerprint validation was established")


# Invalid fingerprint causes exception
try:
    pyexasol.connect(
        dsn=dsn_with_invalid_fingerprint,
        user=config.user,
        password=config.password,
        schema=config.schema,
        encryption=True,
    )
except pyexasol.ExaConnectionFailedError as e:
    print(e)

# Valid fingerprint without encryption causes exception
try:
    pyexasol.connect(
        dsn=dsn_with_valid_fingerprint,
        user=config.user,
        password=config.password,
        schema=config.schema,
    )
except pyexasol.ExaConnectionDsnError as e:
    print(e)

Custom Session Parameters

passing custom session parameters client_name, client_version, etc.
"""
Custom client name, client version, other session parameters
Useful for logging
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Normal session
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    fetch_dict=True,
)

st = C.execute("SELECT * FROM EXA_DBA_SESSIONS WHERE session_id=CURRENT_SESSION")
printer.pprint(st.fetchall())

C.close()

# Modified session
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    fetch_dict=True,
    client_name="MyCustomClient",
    client_version="1.2.3",
    client_os_username="small_cat",
)

st = C.execute("SELECT * FROM EXA_DBA_SESSIONS WHERE session_id=CURRENT_SESSION")
printer.pprint(st.fetchall())

C.close()

Local Config File

connect using local config file
"""
Local config file
"""

import configparser
import pathlib
import pprint
import tempfile

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Generate tmp file with sample config
with tempfile.TemporaryDirectory() as tempdir:
    tempdir = pathlib.Path(tempdir)

    handle = open(tempdir / "test.ini", "w+", encoding="utf-8")
    parser = configparser.ConfigParser()

    parser["test1"] = {
        "dsn": config.dsn,
        "user": config.user,
        "password": config.password,
        "schema": config.schema,
        "compression": True,
        "encryption": False,
        "socket_timeout": 20,
    }

    parser.write(handle)
    handle.seek(0)

    print(handle.read())
    handle.close()

    # Open connection using config file
    C = pyexasol.connect_local_config("test1", config_path=handle.name)

    # Basic query
    stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
    printer.pprint(stmt.fetchall())

Profiling

last query profiling
"""
Profiling with and without details
"""

import json
import pprint
import sys

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

C = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)

C.execute("ALTER SESSION SET QUERY_CACHE = 'OFF'")
C.execute("ALTER SESSION SET PROFILE = 'ON'")

# Normal profiling
stmt = C.execute(
    """
    SELECT u.user_id, sum(p.gross_amt) AS total_gross_amt
    FROM users u
        LEFT JOIN payments p ON (u.user_id=p.user_id)
    GROUP BY 1
    ORDER BY 2 DESC NULLS LAST
    LIMIT 10
"""
)

printer.pprint(stmt.fetchall())
json.dump(C.ext.explain_last(), sys.stdout, indent=4)

# Profiling with extra details per node (IPROC column)
C.execute(
    """
    SELECT u.user_id, sum(p.gross_amt) AS total_gross_amt
    FROM users u
        LEFT JOIN payments p ON (u.user_id=p.user_id)
    GROUP BY 1
    ORDER BY 2 DESC NULLS LAST
    LIMIT 10
"""
)
json.dump(C.ext.explain_last(details=True), sys.stdout, indent=4)

Snapshot Transactions

snapshot transactions mode, which may help with metadata locking problems
"""
Snapshot transactions
Explanations about locks: https://exasol.my.site.com/s/article/WAIT-FOR-COMMIT-on-SELECT-statement?language=en_US
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# First connection, read first table, update second table
C1 = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
C1.set_autocommit(False)

C1.execute("SELECT * FROM TAB1")
C1.execute("INSERT INTO TAB2 VALUES (1)")

# Second connection, update first table
C2 = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
C2.set_autocommit(False)

C2.execute("INSERT INTO TAB1 VALUES(1)")
C2.commit()

# Third connection, read second table
C3 = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    snapshot_transactions=True,
    debug=True,
)

# Exasol locks on this query without snapshot transactions
stmt = C3.execute(
    "SELECT column_name, column_type FROM EXA_ALL_COLUMNS WHERE column_schema=CURRENT_SCHEMA AND column_table='TAB2'"
)
printer.pprint(stmt.fetchall())

UDF Script Output

run query with UDF script and capture output (may not work on a local laptop)
"""
Script output server
Exasol should be able to open connection to the host where current script is running
May not work on laptops due to firewall and network security restrictions
"""

import os
import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

is_travis = "TRAVIS" in os.environ

C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    query_timeout=5,
    # Custom parameters are required to make this example work in Travis context
    # 172.17.0.1 is an IP address of docker host in Linux
    udf_output_bind_address=("", 8580) if is_travis else None,
    udf_output_connect_address=("172.17.0.1", 8580) if is_travis else None,
)

# Normal script output from multiple VM's
stmt, log_files = C.execute_udf_output(
    """
    SELECT echo_java(user_id)
    FROM users
    GROUP BY CEIL(RANDOM() * 4)
"""
)

printer.pprint(stmt.fetchall())
printer.pprint(log_files)

print(log_files[0].read_text())

# No rows selected, no VM's started by Exasol, no log files created
# execute_udf_output should not hang in such case
stmt, log_files = C.execute_udf_output(
    """
    WITH cte_users AS (
        SELECT user_id
        FROM users
        ORDER BY null
        LIMIT 0
    )
    SELECT echo_java(user_id)
    FROM cte_users
    GROUP BY CEIL(RANDOM() * 4)
"""
)

printer.pprint(stmt.fetchall())
printer.pprint(log_files)

print(f"SELECTED {stmt.rowcount()} rows in {stmt.execution_time}s")

Class Extension

extend core PyEXASOL classes to add custom logic
"""
Extend core PyEXASOL classes, add custom logic
In this example we add print_session_id() custom method to all objects
"""

import collections
import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)


class CustomExaStatement(pyexasol.ExaStatement):
    def print_session_id(self):
        print(f"Statement session_id: {self.connection.session_id()}")


class CustomExaFormatter(pyexasol.ExaFormatter):
    def print_session_id(self):
        print(f"Formatter session_id: {self.connection.session_id()}")


class CustomExaLogger(pyexasol.ExaLogger):
    def print_session_id(self):
        print(f"Logger session_id: {self.connection.session_id()}")


class CustomExaExtension(pyexasol.ExaExtension):
    def print_session_id(self):
        print(f"Extension session_id: {self.connection.session_id()}")


class CustomExaMetaData(pyexasol.ExaMetaData):
    def print_session_id(self):
        print(f"MetaData session_id: {self.connection.session_id()}")


class CustomExaConnection(pyexasol.ExaConnection):
    # Set custom sub-classes here
    cls_statement = CustomExaStatement
    cls_formatter = CustomExaFormatter
    cls_logger = CustomExaLogger
    cls_extension = CustomExaExtension
    cls_meta = CustomExaMetaData

    def __init__(self, **kwargs):
        if "custom_param" in kwargs:
            print(f"Custom connection parameter: {kwargs['custom_param']}")
            del kwargs["custom_param"]

        super().__init__(**kwargs)


C = CustomExaConnection(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    custom_param="test custom param!",
)

stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")

# Access overloaded objects
stmt.print_session_id()
C.format.print_session_id()
C.logger.print_session_id()
C.ext.print_session_id()
C.meta.print_session_id()

C.close()

print("Return result rows as named tuples")


class NamedTupleExaStatement(pyexasol.ExaStatement):
    def __next__(self):
        row = super().__next__()
        return self.cls_row(*row)

    def _init_result_set(self, res):
        super()._init_result_set(res)
        self.cls_row = collections.namedtuple("Row", self.col_names)


class NamedTupleExaConnection(pyexasol.ExaConnection):
    cls_statement = NamedTupleExaStatement


C = NamedTupleExaConnection(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)
stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
print(stmt.fetchone())
print(stmt.fetchone())

C.close()

Quoted Identifiers

enable quoted identifiers for import_*, export_* and other relevant functions
"""
Major cases when 'quote_ident' connection option takes effect
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Basic connect
C = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, quote_ident=True
)

# Open schema
C.open_schema(config.schema)

# Export from table name with lower case characters
pd = C.export_to_pandas("camelCaseTable")
pd.info()

stmt = C.last_statement()
print(f"EXPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

# Export into table name with lower case characters
C.import_from_pandas(pd, "camelCaseTable")

stmt = C.last_statement()
print(f"IMPORTED {stmt.rowcount()} rows in {stmt.execution_time}s")

# Ext
cols = C.ext.get_columns("camelCaseTable")
printer.pprint(cols)

cols = C.ext.get_columns_sql(
    "SELECT * FROM {table_name!q}", {"table_name": "camelCaseTable"}
)
printer.pprint(cols)

cols = C.ext.get_columns((config.schema, "camelCaseTable"))
printer.pprint(cols)

cols = C.ext.get_sys_columns("camelCaseTable")
printer.pprint(cols)

Thread Safety

built-in protection from accessing connection object from multiple threads simultaneously
"""
Attempt to access connection object from multiple threads simultaneously
"""

import pprint
import threading
import time

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)


class QueryThread(threading.Thread):
    def __init__(self, connection):
        self.connection = connection
        super().__init__()

    def run(self):
        # Run heavy query
        try:
            self.connection.execute(
                "SELECT * FROM users a, users b, users c, payments d"
            )
        except pyexasol.ExaQueryAbortError as e:
            print(e.message)
        except pyexasol.ExaConcurrencyError as e:
            print(e.message)


# Basic connect
C = pyexasol.connect(
    dsn=config.dsn, user=config.user, password=config.password, schema=config.schema
)

# Try to run multiple query threads in parallel
query_thread_1 = QueryThread(C)
query_thread_2 = QueryThread(C)

query_thread_1.start()
query_thread_2.start()

time.sleep(1)
C.abort_query()

query_thread_1.join()
query_thread_2.join()

DSN Parsing

parsing of complex connection strings and catching relevant exceptions
"""
Demonstration of DSN (Connection string) expansion and related exceptions
"""

import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)


C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    verbose_error=False,
    encryption=True,
)

print("IP range with custom port: ")
result = C._process_dsn("127.0.0.10..19:8564")
printer.pprint(sorted(result))

print("Multiple ranges with multiple ports and with default port at the end: ")
result = C._process_dsn("127.0.0.10..19:8564,127.0.0.20,localhost:8565,127.0.0.21..23")
printer.pprint(sorted(result))

print("Multiple ranges with fingerprint and port: ")
result = C._process_dsn("127.0.0.10..19/ABC,127.0.0.20,localhost/CDE:8564")
printer.pprint(sorted(result))

# Empty DSN
try:
    result = C._process_dsn(" ")
    printer.pprint(result)
except pyexasol.ExaConnectionDsnError as e:
    print(e)


# Invalid range
try:
    result = C._process_dsn("127.0.0.15..10")
    printer.pprint(result)
except pyexasol.ExaConnectionDsnError as e:
    print(e)

# Cannot resolve hostname
try:
    result = C._process_dsn("test1..5.zlan")
    printer.pprint(result)
except pyexasol.ExaConnectionDsnError as e:
    print(e)

# Hostname range with zero-padding
try:
    result = C._process_dsn("test01..20.zlan")
    printer.pprint(result)
except pyexasol.ExaConnectionDsnError as e:
    print(e)

HTTP Proxy

connection via HTTP proxy
"""
Open connection with HTTP proxy
"""

import pprint
import subprocess
import time

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Simple HTTP proxy
pproxy = subprocess.Popen(["pproxy", "-l", "http://:8562/", "--reuse"])
time.sleep(1)

C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    http_proxy="http://localhost:8562",
)

stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())

C.close()
pproxy.terminate()


# HTTP proxy with auth
pproxy = subprocess.Popen(
    ["pproxy", "-l", "http://:8562/#my_user:secret_pass", "--reuse"]
)
time.sleep(1)

C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    http_proxy="http://my_user:secret_pass@localhost:8562",
)

stmt = C.execute("SELECT * FROM users ORDER BY user_id LIMIT 5")
printer.pprint(stmt.fetchall())

C.close()
pproxy.terminate()

Garbage Collection

detect potential garbage collection problems due to cross-references
"""
Ensure ExaConnection and ExaStatement objects are garbage collected properly
"""

import gc
import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)


class ExaStatementVerbose(pyexasol.ExaStatement):
    def __del__(self):
        print(
            f"ExaStatement session_id={self.connection.session_id()}, stmt_idx={self.stmt_idx} is about to be collected"
        )
        super().__del__()


class ExaConnectionVerbose(pyexasol.ExaConnection):
    cls_statement = ExaStatementVerbose

    def __del__(self):
        print(f"ExaConnection {self.session_id()} is about to be collected")
        super().__del__()


def run_test():
    C = ExaConnectionVerbose(
        dsn=config.dsn,
        user=config.user,
        password=config.password,
        schema=config.schema,
        debug=False,
        fetch_size_bytes=1024,
    )

    # Execute statement, read some data
    st = C.execute("SELECT * FROM users")
    st.fetchmany(5)

    # Execute another statement, no more references for the first statement
    st = C.execute("SELECT * FROM payments")
    st.fetchmany(5)

    del st
    del C

    print("Collect call 1 start")
    gc.collect()
    print("Collect call 1 finish")

    C = ExaConnectionVerbose(
        dsn=config.dsn,
        user=config.user,
        password=config.password,
        schema=config.schema,
        debug=False,
        fetch_size_bytes=1024,
    )

    st = C.execute("SELECT * FROM users")
    st.fetchmany(5)


run_test()

print("Collect call 2 start")
gc.collect()
print("Collect call 2 finish")

print("Finishing script, nothing should be collected beyond this point!")

Examples of JSON libraries used for fetching

RapidJSON

JSON library rapidjson
"""
JSON library "rapidjson"
"""

import decimal
import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Basic connect
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    json_lib="rapidjson",
)

edge_cases = [
    # Biggest values
    {
        "dec36_0": decimal.Decimal("+" + ("9" * 36)),
        "dec36_36": decimal.Decimal("+0." + ("9" * 36)),
        "dbl": 1.7e308,
        "bl": True,
        "dt": "9999-12-31",
        "ts": "9999-12-31 23:59:59.999",
        "var100": "ひ" * 100,
        "var2000000": "ひ" * 2000000,
    },
    # Smallest values
    {
        "dec36_0": decimal.Decimal("-" + ("9" * 36)),
        "dec36_36": decimal.Decimal("-0." + ("9" * 36)),
        "dbl": -1.7e308,
        "bl": False,
        "dt": "0001-01-01",
        "ts": "0001-01-01 00:00:00",
        "var100": "",
        "var2000000": "ひ",
    },
    # All nulls
    {
        "dec36_0": None,
        "dec36_36": None,
        "dbl": None,
        "bl": None,
        "dt": None,
        "ts": None,
        "var100": None,
        "var2000000": None,
    },
]

insert_q = "INSERT INTO edge_case VALUES ({dec36_0!d}, {dec36_36!d}, {dbl!f}, {bl}, {dt}, {ts}, {var100}, {var2000000})"
select_q = "SELECT dec36_0, dec36_36, dbl, bl, dt, ts, var100, LENGTH(var2000000) AS len_var FROM edge_case"

C.execute("TRUNCATE TABLE edge_case")

# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))

# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())


# Same actions with "exasol_mapper"
C.options["fetch_mapper"] = pyexasol.exasol_mapper
C.execute("TRUNCATE TABLE edge_case")

# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))

# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())

# Import and export
edge_tuples = C.execute("SELECT * FROM edge_case").fetchall()

UJSON

JSON library ujson
"""
JSON library "ujson"
"""

import decimal
import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Basic connect
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    json_lib="ujson",
)

edge_cases = [
    # Biggest values
    {
        "dec36_0": decimal.Decimal("+" + ("9" * 36)),
        "dec36_36": decimal.Decimal("+0." + ("9" * 36)),
        "dbl": 1.7e308,
        "bl": True,
        "dt": "9999-12-31",
        "ts": "9999-12-31 23:59:59.999",
        "var100": "ひ" * 100,
        "var2000000": "ひ" * 2000000,
    },
    # Smallest values
    {
        "dec36_0": decimal.Decimal("-" + ("9" * 36)),
        "dec36_36": decimal.Decimal("-0." + ("9" * 36)),
        "dbl": -1.7e308,
        "bl": False,
        "dt": "0001-01-01",
        "ts": "0001-01-01 00:00:00",
        "var100": "",
        "var2000000": "ひ",
    },
    # All nulls
    {
        "dec36_0": None,
        "dec36_36": None,
        "dbl": None,
        "bl": None,
        "dt": None,
        "ts": None,
        "var100": None,
        "var2000000": None,
    },
]

insert_q = "INSERT INTO edge_case VALUES ({dec36_0!d}, {dec36_36!d}, {dbl!f}, {bl}, {dt}, {ts}, {var100}, {var2000000})"
select_q = "SELECT dec36_0, dec36_36, dbl, bl, dt, ts, var100, LENGTH(var2000000) AS len_var FROM edge_case"

C.execute("TRUNCATE TABLE edge_case")

# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))

# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())


# Same actions with "exasol_mapper"
C.options["fetch_mapper"] = pyexasol.exasol_mapper
C.execute("TRUNCATE TABLE edge_case")

# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))

# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())

# Import and export
edge_tuples = C.execute("SELECT * FROM edge_case").fetchall()

ORJSON

JSON library orjson
"""
JSON library "orjson"
"""

import decimal
import pprint

import _config as config

import pyexasol

printer = pprint.PrettyPrinter(indent=4, width=140)

# Basic connect
C = pyexasol.connect(
    dsn=config.dsn,
    user=config.user,
    password=config.password,
    schema=config.schema,
    json_lib="orjson",
)

edge_cases = [
    # Biggest values
    {
        "dec36_0": decimal.Decimal("+" + ("9" * 36)),
        "dec36_36": decimal.Decimal("+0." + ("9" * 36)),
        "dbl": 1.7e308,
        "bl": True,
        "dt": "9999-12-31",
        "ts": "9999-12-31 23:59:59.999",
        "var100": "ひ" * 100,
        "var2000000": "ひ" * 2000000,
    },
    # Smallest values
    {
        "dec36_0": decimal.Decimal("-" + ("9" * 36)),
        "dec36_36": decimal.Decimal("-0." + ("9" * 36)),
        "dbl": -1.7e308,
        "bl": False,
        "dt": "0001-01-01",
        "ts": "0001-01-01 00:00:00",
        "var100": "",
        "var2000000": "ひ",
    },
    # All nulls
    {
        "dec36_0": None,
        "dec36_36": None,
        "dbl": None,
        "bl": None,
        "dt": None,
        "ts": None,
        "var100": None,
        "var2000000": None,
    },
]

insert_q = "INSERT INTO edge_case VALUES ({dec36_0!d}, {dec36_36!d}, {dbl!f}, {bl}, {dt}, {ts}, {var100}, {var2000000})"
select_q = "SELECT dec36_0, dec36_36, dbl, bl, dt, ts, var100, LENGTH(var2000000) AS len_var FROM edge_case"

C.execute("TRUNCATE TABLE edge_case")

# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))

# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())


# Same actions with "exasol_mapper"
C.options["fetch_mapper"] = pyexasol.exasol_mapper
C.execute("TRUNCATE TABLE edge_case")

# Insert (test formatting)
C.execute(insert_q, dict(edge_cases[0]))
C.execute(insert_q, dict(edge_cases[1]))
C.execute(insert_q, dict(edge_cases[2]))

# Select and fetch
stmt = C.execute(select_q)
printer.pprint(stmt.fetchall())

# Import and export
edge_tuples = C.execute("SELECT * FROM edge_case").fetchall()