DataCatalog: analytic_tables

Before we are able to download datasets that are part of the analytic_tables catalog we need a user name and password to ssh-access a remote machine.

From our metadata catalog we are able to get all the necessary information to open a SFTP connection with the machine that holds our dataset. First we check what datasets are in the catalog.


import pandas as pd
import pyarrow as pa
import pyarrow.parquet.encryption as pe

import src.utils as ut
import src.ftp as ftp

%load_ext autoreload
%autoreload 2

# Setup the root path of the application
project_path = ut.project_path()

# Load the metadata

meta_filename = [
    f"{ut.project_path(1)}/meta/mosquito_alert/analytic_tables.json",
    f"{ut.project_path(1)}/meta_ipynb/analytic_tables.html",
]
metadata = ut.load_metadata(meta_filename)

# Get contentUrl from metadata file
ut.info_meta(metadata)

Dataset: tigaserver_app_reports

1. Distribution by SFTP protocol from MosquitoAlert webserver

If we would like to download the dataset tigaserver_app_reports we just needs its contentUrl information. Note that analytic_tables has many parts (datasets) and one of those is tigaserver_app_reports which is the second in the list of the hasPart key attribute of the metadata file.

# Get the dataset tigaserver_app_reports form the _analytic_tables_ catalog
idx_hasPart = 1
contentUrl, dataset_name, distr_name = ut.get_meta(
    metadata, idx_distribution=0, idx_hasPart=idx_hasPart, parse=True
)
dataset_metadata = metadata["hasPart"][idx_hasPart]

# Make folders for data download
path = f"{project_path}/data/{dataset_name}/{distr_name}"
ut.makedirs(path)
# Insert user password to connect by ftp
password = input(f"Enter {contentUrl.username} user password:")
# Get schema info
schema_parquet, schema_numpy, time_cols = ut.get_schema(dataset_metadata)
# Get the dataframe
df = ftp.read_csv_sftp(
    hostname=contentUrl.hostname,
    port=contentUrl.port,
    username=contentUrl.username,
    password=password,
    remotepath=contentUrl.path,
    parse_dates=time_cols,
    date_parser=lambda col: pd.to_datetime(col, utc=True, errors="coerce"),
)

df = ut.apply_schema(df, schema_numpy)
df.info()
# Convert the dataframe to parquet table
table = pa.Table.from_pandas(df, schema=schema_parquet)

# Save reports on CSV or parquet
filename = f"{path}/dataset"
pa.parquet.write_table(
    table, f"{filename}.parquet"
)  # very low file-size (need to install pyArrow)
df.to_csv(f"{filename}.csv")  # x10 size if compared with the dataframe
# Read parquet schema from file
ut.read_parquet_schema(f"{filename}.parquet")
from datetime import timedelta

filename_encrypted = f"{filename}_encrypted.parquet"

FOOTER_KEY = b"0123456789112345"
FOOTER_KEY_NAME = "footer_key"
COL_KEY = b"1234567890123450"
COL_KEY_NAME = "col_key"

encryption_config = pe.EncryptionConfiguration(
    cache_lifetime=timedelta(minutes=5.0),
    footer_key=FOOTER_KEY_NAME,
    column_keys={
        COL_KEY_NAME: ["note", "user_id"],
    },
    encryption_algorithm="AES_GCM_V1",
    data_key_length_bits=256,
    plaintext_footer=True,
    internal_key_material=True,
)

kms_connection_config = pe.KmsConnectionConfig(
    custom_kms_conf={
        FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
        COL_KEY_NAME: COL_KEY.decode("UTF-8"),
    }
)


def kms_factory(kms_connection_configuration):
    return ut.InMemoryKmsClient(kms_connection_configuration)


crypto_factory = pe.CryptoFactory(kms_factory)
# Write with encryption properties
pa.parquet.write_table(
    table,
    filename_encrypted,
    encryption_properties=crypto_factory.file_encryption_properties(
        kms_connection_config, encryption_config
    ),
)
# Read with decryption properties
# NOTE: read_table does not work with encrypted files
pfile = pa.parquet.ParquetFile(
    filename_encrypted,
    decryption_properties=crypto_factory.file_decryption_properties(
        kms_connection_config
    ),
)
table_decrypted = pfile.read(use_threads=True)