DataCatalog: meteocat_xema

import src.utils as ut

# Setup the root path of the application
project_path = ut.project_path()

# Load the metadata

meta_filename = [
    f"{ut.project_path(1)}/meta/environment/meteocat_xema.json",
    f"{ut.project_path(2)}/meta_ipynb/meteocat_xema.html",
]
metadata = ut.load_metadata(meta_filename)

# Get contentUrl from metadata file
ut.info_meta(metadata)

1. Distribution of the entire dataset within CSV download

First we need to download the hole dataset as a CSV-file.

# Get the dataset meteocat_xema_data form the meteocat_xema catalog
contentUrl, dataset_name, distr_name = ut.get_meta(
    metadata, idx_distribution=0, idx_hasPart=0
)

# Make folders for data download
path = f"{project_path}/data/{dataset_name}/{distr_name}"
ut.makedirs(path)
# Download the dataset file (big files could get a while to download)
filename = f"{path}/dataset.csv"
ut.download_file(contentUrl, filename, method="curl")

1.1. Copy from CSV to PostgreSQL database

Here we assume that PostgreSQL database server is available on the user’s machine. From contentUrl we download the dataset. For convenience, let’s rename it as dataset.csv). We use the following SQL code to generate a new table and copy the CSV-dataset content. Note that the CSV-file could be badly formatted in some row and that the copy operation will exit with error. Thus it is important to check and correct (drop the inconsistent rows) of the CSV-file if needed.

  DROP TABLE IF EXISTS xema;

  CREATE TABLE IF NOT EXISTS xema (
	    ID text,
	    CODI_ESTACIO text,
	    CODI_VARIABLE text,
	    DATA_LECTURA timestamp,
	    DATA_EXTREM timestamp,
	    VALOR_LECTURA FLOAT8,
	    CODI_ESTAT text,
	    CODI_BASE text
  );

  COPY xema  FROM './[path]/dataset.csv' DELIMITER ',' HEADER CSV;

At this point we imported into PostgreSQL the dataset table, but the data types of the attributes could be redefined to save some memory and accelerate filtering operations.

   ALTER TABLE xema
      ALTER COLUMN id TYPE VARCHAR (14),
      ALTER COLUMN codi_estacio TYPE VARCHAR (2),
      ALTER COLUMN codi_variable TYPE INT USING codi_variable::integer,
      ALTER COLUMN valor_lectura TYPE FLOAT4,
      ALTER COLUMN codi_estat TYPE VARCHAR (1),
      ALTER COLUMN codi_base TYPE VARCHAR (2);

Finally, assume that we are interested in the weather station with code ‘KP’. Below, we select the station and export it to a CSV file.

   COPY (
      SELECT codi_variable, data_lectura, data_extrem, valor_lectura, codi_estat
      FROM kp_station ks)
      TO '/path/kp_station.csv' DELIMITER ',' CSV HEADER;

1.2. Distributed computation read of big-CSV files

Another alternative to query directly the downloaded CSV dataset is to use Dask Python library. Dask allows us to use parallel computing by chunking the file-read. Here we use Dask on a single machine (our personal computer), but it could be easily setup to work on distributed cluster machines.

First we start a Dask client and we setup it to work with 4 threads with a memory limit of 2GB:

from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd

client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit="1GB")
client

At this point, use the Dask dataframe in the same fashion as we would use Pandas. Note that the read_csv operation has not executed yet.

dtype = {
    "CODI_ESTACIO": "category",
    "CODI_VARIABLE": "float",
    "VALOR_LECTURA": "float",
    "CODI_ESTAT": "category",
    "CODI_BASE": "category",
}

df = dd.read_csv(filename, parse_dates=["DATA_LECTURA", "DATA_EXTREM"], dtype=dtype)

Since Dask works with lazy computation, we should specify when we actually would like to compute the chain of operations that we’ve setup. This is because Dask first needs to build a directed acyclic diagram (DAG) of workers. For example, to query two weather stations from the dataset and we run the Dask client with the .compute() operation:

res = df.query("(CODI_ESTACIO == 'UR' or CODI_ESTACIO == 'KP')").compute()

Note that with the above Dask client settings (computation resources) the query operation runs for a couple of hours. On the other hand, the SQL approach is much faster even if it runs on a single core because of its internal optimizations (compiled code and key-index). The good point of Dask is that it is easy to scale-up if more computation power is available.

2. Distribution from Socrata API

This example shows how to get weather station data from the Socrate API and store them into a pandas dataframe. Here no storage capacity or setup of a SQL server is needed since the database and the query operations are managed by the API’ web-server.

from sodapy import Socrata
import pandas as pd
import gc
from concurrent.futures import ThreadPoolExecutor

First we will get a list of available stations of the XEMA network

# Get the dataset meteocat_xema_stations form the api end-point distribution
contentUrl, dataset_name, distr_name = ut.get_meta(
    metadata, idx_distribution=1, idx_hasPart=1
)

# Make folders for data download
path = f"{project_path}/data/{dataset_name}/{distr_name}"
ut.makedirs(path)

Note that since we use Socrata API library to request data we need to provide a the source domain and a dataset identifier. Note that the API will warn us if we don’t provide an app-token but for this dataset there is actually no limitation to data access.

# Get the domain and dataset identifier from metadata
url_split = contentUrl.split("/")
domain = url_split[2]
dataset_id = url_split[-1].split(".")[0]
print(f'Domain is "{domain}" with dataset identifier "{dataset_id}".')

# Setup a client to a Socrata domain
client = Socrata(domain, None, timeout=10)
# Setup the end-point to
stations = client.get(dataset_id)

We can display the list of all the available weather station as a dataframe table and save it

df_stations = pd.DataFrame.from_records(stations, coerce_float=True)
df_stations.head()
# Save on CSV-file
filename = f"{path}/dataset.csv"
df_stations.to_csv(filename)

Each station has a variety of measurement devices which are listed in meteocat_xema_variables dataset.

# Get the dataset meteocat_xema_variables and the api end-point distribution
contentUrl, dataset_name, distr_name = ut.get_meta(
    metadata, idx_distribution=1, idx_hasPart=2
)

# Make folders for data download
path = f"{project_path}/data/{dataset_name}/{distr_name}"
ut.makedirs(path)
# Get the dataset identifier from metadata
url_split = contentUrl.split("/")
dataset_id = url_split[-1].split(".")[0]
print(f'Domain is "{domain}" with dataset identifier "{dataset_id}".')
# Get data that describes measurement devices
variables = client.get(dataset_id)
df_variables = pd.DataFrame.from_records(variables, coerce_float=True)
df_variables.head()
# Save on CSV-file
filename = f"{path}/dataset.csv"
df_stations.to_csv(filename)

At this point, assume that we are interested to get data for the weather station with code KP relative to Fogars de la Selva. For this we need to provide the dataset identifier of meteocat_xema_data .

df_stations.query("codi_estacio == 'KP'")
# Get the dataset meteocat_xema_data form the api end-point distribution
contentUrl, dataset_name, distr_name = ut.get_meta(
    metadata, idx_distribution=1, idx_hasPart=0
)

# Make folders for data download
path = f"{project_path}/data/{dataset_name}/{distr_name}"
ut.makedirs(path)
# Get the dataset identifier from metadata
url_split = contentUrl.split("/")
dataset_id = url_split[-1].split(".")[0]
print(f'Domain is "{domain}" with dataset identifier "{dataset_id}".')

It is a good idea to check the metadata of the dataset. In our case we check the total number of records in the meteocat_xema_data dataset.

# Get metadata and max number of records available
meta = client.get_metadata(dataset_id)
cachedContents = meta["columns"][0]["cachedContents"]
limit = int(cachedContents["non_null"]) + int(cachedContents["null"])

Once we choose one or more weather stations to query, we are ready to get a JSON data-response from the API. We specify how many last records we need by the parameter limit. If we do not assign any value to limit, it will get at most the first 1000 records.

If we would like to get a set of stations it is more convenient to setup a threaded api call. It is possible to perform a query with a sql statement like IN ('KP', 'J5', ...) but the output dataframe could exceed the available memory available.



def getData(dataset_id, code, min_date="", limit=1000, path=None):

    # Query the data by code station and datetime if given
    if min_date != "":
        where = f"codi_estacio = '{code}' AND data_lectura > '{min_date}'"
    else:
        where = f"codi_estacio = '{code}'"

    try:
        records = client.get(dataset_id, where=where, limit=limit)
    except:
        if min_date != "":
            print(f"\n Station {code}: retry without datetime filter")
            where = f"codi_estacio = '{code}'"
            records = client.get(dataset_id, where=where, limit=limit)

    df = pd.DataFrame.from_records(records, coerce_float=True)

    # Convert to proper datatype
    column_types = {
        "data_lectura": "datetime64",
        "data_extrem": "datetime64",
        "codi_estacio": "category",
        "codi_estat": "category",
        "codi_base": "category",
        "codi_variable": "int",
        "valor_lectura": "float",
    }
    df = df.astype(column_types)

    # Apply datetime filter
    if min_date != "":
        df = df.query(f"data_lectura > '{min_date}'")

    if path is not None:
        # Save on CSV-file and parquet (note the storage saving with parquet)
        filename = f"{path}/dataset_{code}"
        df.to_csv(filename + ".csv")
        df.to_parquet(filename + ".parquet")
        print(f"Saved on {filename}")
        gc.collect()

    return df
# For only one station we simply run getData
min_date = "2020-01-01"
codes = ["XV"]
df = getData(dataset_id, code=codes[0], min_date=min_date, limit=limit, path=path)
df.info()
# But for a set of stations we can run a threaded api call
min_date = "2020-01-01"
codes = ["KP", "J5", "UC", "XZ", "DO", "UB", "U2", "DJ", "WT", "D4", "DF", "W1", "XJ"]

# Put few workers in order to prevent memory overflow
with ThreadPoolExecutor(max_workers=4) as executor:
    for code in codes:
        executor.submit(getData, dataset_id, code, min_date, limit, path)

Note that here measurement devices are named by their codes: names should be retrieved from the dataset meteocat_xema_variables with its own dataset identifier. Below is given an example of how measurement codes could be renamed by its corresponded name-description.

# Pivot to get time-value table for each measurement device
df_sub = df[["data_lectura", "codi_variable", "valor_lectura"]]
df_sub = df_sub.pivot("data_lectura", "codi_variable")
df_sub.head()
# Make a dictionary of code to variable names
df_variables["nom_variable_unitat"] = (
    df_variables["nom_variable"] + " (" + df_variables["unitat"] + ")"
)
dict_var = df_variables.set_index("codi_variable")["nom_variable_unitat"].to_dict()

# For each weather station, save data in a CSV-file
for code in codes:
    df = pd.read_parquet(f"{path}/dataset_{code}.parquet")
    # Select a station and pivot
    df_station = df.query(f"codi_estacio == '{code}'")
    df_station = df_station[~df_station[["data_lectura", "codi_variable"]].duplicated()]
    df_station = df_station.pivot("data_lectura", "codi_variable", "valor_lectura")
    # Change column type to string
    df_station.columns = df_station.columns.map(str)
    df_station = df_station.rename(columns=dict_var)
    # Save table in a CSV-file
    df_station.to_csv(f"{path}/{code}_station.csv", sep=",")

Finally, just plot the last station from the above loop to get a visual idea of the data.

p = df_station.plot(figsize=(10, 10), title=f"Weather station with code {code}")