lakeshack.metastore

Copyright (C) 2022 Matthew Hendrey

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.

class lakeshack.metastore.Metastore(store_url: str, store_table: str, arrow_schema: pyarrow.lib.Schema, cluster_column: str = None, *optional_columns: str, **store_kwargs)[source]

Bases: object

Store metadata from parquet files into a database using SQLAlchmey

Parameters
  • store_url (str) – URL string to backend database. See sqlalchemy.create_engine() for more details and examples.

  • store_table (str) – Name of the table storing the parquet metadata

  • arrow_schema (pa.lib.Schema) – Arrow schema of the parquet files

  • cluster_column (str, optional) – Name of the column in the Arrow schema used for clustering the data. If None (default), then expecting the table to already exist.

  • *optional_columns (str) – Additional columns in the Arrow schema whose metadata is to be stored in the metastore. These should have some clustering in order to be useful, even if less clustering than in the cluster_column

  • **store_kwargs (Any, optional) – Arguments to be passed to sqlalchemy.create_engine()

engine

SQLAlchemy engine that connects to the database

Type

sa.engine.base.Engine

table

SQLAlchemy table holding the metadata

Type

sa.sql.schema.Table

Example

from pyarrow import fs
import pyarrow.dataset as ds
from lakeshack.metastore import Metastore

s3 = fs.S3FileSystem(region="us-iso-east-1")
s3_dir = "sales_data/2023/03/15/"
dataset = ds.dataset(s3_dir, filesystem=s3, format="parquet")

metastore = Metastore("sqlite:///:memory:", "sales_table", dataset.schema,
    "customer_id", "timestamp")
metastore.update(s3_dir, filesystem=s3, n_workers=30)
query(cluster_column_values: List[Any], optional_where_clauses: List[Tuple] = []) → Dict[str, List[Any]][source]

Given the cluster_column_values return the filepaths of the parquet files whose min/max contains a cluster_column_value.

If optional_where_clauses are provide, then further restrict the filepaths to return so they match these conditions too.

Parameters
  • cluster_column_values (List[Any]) – List of cluster column values of interest

  • optional_where_clauses (List[Tuple], optional) – List of optional columns to further restrict the parquet files to be queried. Each tuple is three values column_name, comparision operator [>=, >, =, ==, <, <=], and value

Returns

Keys are the filepaths to the parquet files. Values are a list of the cluster column values associated with that parquet file

Return type

Dict[str, List[Any]]

update(parquet_file_or_dir: str, filesystem: pyarrow._fs.FileSystem = <pyarrow._fs.LocalFileSystem object>, n_workers: int = 16) → None[source]

Add parquet file metadata to the metastore. If a directory is provided, then a recursive walk is done. Any non-parquet files are simply skipped and logged.

Parameters
  • parquet_file_or_dir (str) – Provide the filepath to either a single parquet file or a directory that contains many parquet files.

  • filesystem (fs.FileSystem, optional) – PyArrow file system where parquet files are located. Default is fs.LocalFileSystem()

  • n_workers (int, optional) – Size of the thread pool used to concurrently retrieve parquet file metadata. Default is 16

Returns

Return type

None

lakeshack.lackshack

Copyright (C) 2022 Matthew Hendrey

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.

class lakeshack.lakeshack.Lakeshack(metastore: lakeshack.metastore.Metastore, filesystem: pyarrow._fs.FileSystem = <pyarrow._fs.LocalFileSystem object>)[source]

Bases: object

Retrieve records stored in Parquet files by first checking the Metastore to determine which Parquet files might have the requested records and then only querying these files.

Parameters
  • metastore (Metastore) – Metastore containing the parquet file metadata that lakeshack will use

  • filesystem (fs.FileSystem, optional) – pyarrow file system. Use fs.S3FileSystem(region=’your_region’) for connecting to S3. Default is fs.LocalSystem()

s3_table

Empty SQLAlchemy table used to make it easier to construct the SQL expression that gets passed to S3 Select.

Type

sa.sql.schema.Table

Example

from lakeshack.lakeshack import Lakeshack
from lakeshack.metastore import Metastore
from pyarrow import fs
import pyarrow.dataset as ds

s3 = fs.S3FileSystem(region="us-iso-east-1")
s3_dir = "sales_data/2023/03/15/"
dataset = ds.dataset(s3_dir, filesystem=s3, format="parquet")

# Connect to an existing metastore
metastore = Metastore("sqlite:///sales.db", "sales_table", dataset.schema)
lakeshake = Lakeshack(metastore, s3)

pa_table = lakeshack.query(55)
# Use S3 Select using a thread pool with 50 workers
pa_s3_table = lakeshack.query_s3_select(
    55,
    n_workers = 50,
)
query(cluster_column_values, optional_where_clauses: List[Tuple] = [], columns: List[str] = None, batch_size: int = 131072, n_records_max: int = 2000000) → pyarrow.lib.Table[source]

Retrieve records from the parquet files using pyarrow’s isin(cluster_column_values) in batches of batch_size. Stop returning results if n_records_max have already been returned.

Parameters
  • cluster_column_values (Union[str, List]) – cluster_column value you want to match on. Provide one or a list of many

  • optional_where_clauses (List[Tuple], optional) – List of optional columns to further restrict the parquet files to be queried. Each tuple is three values column_name, comparision operator [>=, >, =, ==, <, <=], value. This will be logically connected using “AND” between each in the list and with the cluster_column_values

  • columns (List[str], optional) – The columns to return from parquet files. Default is None (all of them)

  • batch_size (int, optional) – Retrieve batches of this size from the dataset. Lower this value to manage RAM if needed. Default is 128 * 1024, same as pyarrow.Dataset.to_batches()

  • n_records_max (int, optional) – Once this many results have been exceed, stop retrieving results and return what has been retrieved so far. Thus, the Table.num_rows returned will be less than or equal to n_records_max + batch_size.

Returns

Return type

pa.lib.Table

query_s3_select(cluster_column_values, optional_where_clauses: List[Tuple] = [], columns: List[str] = None, n_records_max: int = 2000000, n_workers: int = 20) → pyarrow.lib.Table[source]

Retrieve records from the parquet files stored in S3 using S3 Select using a threadpool to speed up the queries.

Parameters
  • cluster_column_values (str, List[str]) – Provide a single cluster column value or a list of them whose records you want to retrieve

  • optional_where_clauses (List[Tuple], optional) – List of optional columns to further restrict the parquet files to be queried. Each tuple is three values column_name, comparision operator [>=, >, =, ==, <, <=], value. This will be logically connected using “AND” between each in the list and with the cluster_column_values

  • columns (List[str]) – Specify the subset of columns to select. Default is None which is select *

  • n_records_max (int, optional) – Once this many results have been exceed, stop retrieving results and return what has been retrieved so far. Thus, the Table.num_rows returned may be greater than n_records_max. Default is 2M

  • n_workers (int, optional) – Number of workers in the ThreadPool to launch parallel calls to S3 Select. Default is 20.

Returns

Return type

pa.lib.Table