lakeshack.metastore¶
Copyright (C) 2022 Matthew Hendrey
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
-
class
lakeshack.metastore.Metastore(store_url: str, store_table: str, arrow_schema: pyarrow.lib.Schema, cluster_column: str = None, *optional_columns: str, **store_kwargs)[source]¶ Bases:
objectStore metadata from parquet files into a database using SQLAlchmey
- Parameters
store_url (str) – URL string to backend database. See sqlalchemy.create_engine() for more details and examples.
store_table (str) – Name of the table storing the parquet metadata
arrow_schema (pa.lib.Schema) – Arrow schema of the parquet files
cluster_column (str, optional) – Name of the column in the Arrow schema used for clustering the data. If None (default), then expecting the table to already exist.
*optional_columns (str) – Additional columns in the Arrow schema whose metadata is to be stored in the metastore. These should have some clustering in order to be useful, even if less clustering than in the cluster_column
**store_kwargs (Any, optional) – Arguments to be passed to sqlalchemy.create_engine()
-
engine¶ SQLAlchemy engine that connects to the database
- Type
sa.engine.base.Engine
-
table¶ SQLAlchemy table holding the metadata
- Type
sa.sql.schema.Table
Example
from pyarrow import fs import pyarrow.dataset as ds from lakeshack.metastore import Metastore s3 = fs.S3FileSystem(region="us-iso-east-1") s3_dir = "sales_data/2023/03/15/" dataset = ds.dataset(s3_dir, filesystem=s3, format="parquet") metastore = Metastore("sqlite:///:memory:", "sales_table", dataset.schema, "customer_id", "timestamp") metastore.update(s3_dir, filesystem=s3, n_workers=30)
-
query(cluster_column_values: List[Any], optional_where_clauses: List[Tuple] = []) → Dict[str, List[Any]][source]¶ Given the cluster_column_values return the filepaths of the parquet files whose min/max contains a cluster_column_value.
If optional_where_clauses are provide, then further restrict the filepaths to return so they match these conditions too.
- Parameters
cluster_column_values (List[Any]) – List of cluster column values of interest
optional_where_clauses (List[Tuple], optional) – List of optional columns to further restrict the parquet files to be queried. Each tuple is three values column_name, comparision operator [>=, >, =, ==, <, <=], and value
- Returns
Keys are the filepaths to the parquet files. Values are a list of the cluster column values associated with that parquet file
- Return type
Dict[str, List[Any]]
-
update(parquet_file_or_dir: str, filesystem: pyarrow._fs.FileSystem = <pyarrow._fs.LocalFileSystem object>, n_workers: int = 16) → None[source]¶ Add parquet file metadata to the metastore. If a directory is provided, then a recursive walk is done. Any non-parquet files are simply skipped and logged.
- Parameters
parquet_file_or_dir (str) – Provide the filepath to either a single parquet file or a directory that contains many parquet files.
filesystem (fs.FileSystem, optional) – PyArrow file system where parquet files are located. Default is fs.LocalFileSystem()
n_workers (int, optional) – Size of the thread pool used to concurrently retrieve parquet file metadata. Default is 16
- Returns
- Return type
None
lakeshack.lackshack¶
Copyright (C) 2022 Matthew Hendrey
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
-
class
lakeshack.lakeshack.Lakeshack(metastore: lakeshack.metastore.Metastore, filesystem: pyarrow._fs.FileSystem = <pyarrow._fs.LocalFileSystem object>)[source]¶ Bases:
objectRetrieve records stored in Parquet files by first checking the Metastore to determine which Parquet files might have the requested records and then only querying these files.
- Parameters
metastore (Metastore) – Metastore containing the parquet file metadata that lakeshack will use
filesystem (fs.FileSystem, optional) – pyarrow file system. Use fs.S3FileSystem(region=’your_region’) for connecting to S3. Default is fs.LocalSystem()
-
s3_table¶ Empty SQLAlchemy table used to make it easier to construct the SQL expression that gets passed to S3 Select.
- Type
sa.sql.schema.Table
Example
from lakeshack.lakeshack import Lakeshack from lakeshack.metastore import Metastore from pyarrow import fs import pyarrow.dataset as ds s3 = fs.S3FileSystem(region="us-iso-east-1") s3_dir = "sales_data/2023/03/15/" dataset = ds.dataset(s3_dir, filesystem=s3, format="parquet") # Connect to an existing metastore metastore = Metastore("sqlite:///sales.db", "sales_table", dataset.schema) lakeshake = Lakeshack(metastore, s3) pa_table = lakeshack.query(55) # Use S3 Select using a thread pool with 50 workers pa_s3_table = lakeshack.query_s3_select( 55, n_workers = 50, )
-
query(cluster_column_values, optional_where_clauses: List[Tuple] = [], columns: List[str] = None, batch_size: int = 131072, n_records_max: int = 2000000) → pyarrow.lib.Table[source]¶ Retrieve records from the parquet files using pyarrow’s isin(cluster_column_values) in batches of batch_size. Stop returning results if n_records_max have already been returned.
- Parameters
cluster_column_values (Union[str, List]) – cluster_column value you want to match on. Provide one or a list of many
optional_where_clauses (List[Tuple], optional) – List of optional columns to further restrict the parquet files to be queried. Each tuple is three values column_name, comparision operator [>=, >, =, ==, <, <=], value. This will be logically connected using “AND” between each in the list and with the cluster_column_values
columns (List[str], optional) – The columns to return from parquet files. Default is None (all of them)
batch_size (int, optional) – Retrieve batches of this size from the dataset. Lower this value to manage RAM if needed. Default is 128 * 1024, same as pyarrow.Dataset.to_batches()
n_records_max (int, optional) – Once this many results have been exceed, stop retrieving results and return what has been retrieved so far. Thus, the Table.num_rows returned will be less than or equal to n_records_max + batch_size.
- Returns
- Return type
pa.lib.Table
-
query_s3_select(cluster_column_values, optional_where_clauses: List[Tuple] = [], columns: List[str] = None, n_records_max: int = 2000000, n_workers: int = 20) → pyarrow.lib.Table[source]¶ Retrieve records from the parquet files stored in S3 using S3 Select using a threadpool to speed up the queries.
- Parameters
cluster_column_values (str, List[str]) – Provide a single cluster column value or a list of them whose records you want to retrieve
optional_where_clauses (List[Tuple], optional) – List of optional columns to further restrict the parquet files to be queried. Each tuple is three values column_name, comparision operator [>=, >, =, ==, <, <=], value. This will be logically connected using “AND” between each in the list and with the cluster_column_values
columns (List[str]) – Specify the subset of columns to select. Default is None which is select *
n_records_max (int, optional) – Once this many results have been exceed, stop retrieving results and return what has been retrieved so far. Thus, the Table.num_rows returned may be greater than n_records_max. Default is 2M
n_workers (int, optional) – Number of workers in the ThreadPool to launch parallel calls to S3 Select. Default is 20.
- Returns
- Return type
pa.lib.Table