API Reference¶
This page provides an auto-generated API reference for the key components of the hydrodatasource library.
Reader¶
hydrodatasource.reader.data_source.SelfMadeHydroDataset
¶
Bases: HydroData
A class for reading hydrodataset, but not really ready-datasets, just some data directorys organized like a HydroDataset.
NOTE: We compile forcing data and attr data into a directory, organized like a ready dataset -- like Caravan. Only two directories are needed: attributes and timeseries
__init__(data_path, dataset_name, time_unit=None, **kwargs)
¶
Initialize a self-made Caravan-style dataset.
Parameters¶
data_path : str The path to the custom-made data sources' parent directory. dataset_name : str SelfMadeHydroDataset's name, for example, googleflood or fdsources, different dataset may use this same datasource class, but they have different dataset_name. time_unit : list, optional we have different time units, by default None kwargs : dict, optional additional keyword arguments, by default None
cache_timeseries_xrdataset(**kwargs)
¶
Save all timeseries data in separate NetCDF files for each time unit.
Parameters¶
t_range : list, optional Time range for the data, by default ["1980-01-01", "2023-12-31"] kwargs : dict, optional batchsize -- Number of basins to process per batch, by default 100 time_units -- List of time units to process, by default None start0101_freq -- for freq setting, if the start date is 01-01, set True, by default False offset_to_utc -- whether to offset the time to UTC, by default False start_hour_in_a_day -- the start hour in a day (0-23), by default 2 which means 2-5-8-11-14-17-20-23 UTC. Chinese basins data always use 08:00 with Beijing Time, so we set the default value to 2. Only applicable for sub-daily intervals (currently only "3h" is supported)
cache_xrdataset(t_range=None, time_units=None)
¶
Save all data in a netcdf file in the cache directory
get_attributes_cols()
¶
the constant cols in this data_source
get_timeseries_cols()
¶
the relevant cols in this data_source
read_area(gage_id_lst=None)
¶
read area of each basin/unit
read_attributes(object_ids=None, constant_cols=None, **kwargs)
¶
2d data (site_num * var_num), non-time-series data
read_mean_prcp(gage_id_lst=None, unit='mm/d')
¶
read mean precipitation of each basin default unit is mm/d, but one can chose other units and we will convert the unit to the specified unit
Parameters¶
gage_id_lst : list, optional the list of gage ids, by default None unit : str, optional the unit of precipitation, by default "mm/d"
Returns¶
xr.Dataset the mean precipitation of each basin
read_timeseries(object_ids=None, t_range_list=None, relevant_cols=None, **kwargs)
¶
Returns a dictionary containing data with different time scales.
Parameters¶
object_ids : list, optional List of object IDs. Defaults to None. t_range_list : list, optional List of time ranges. Defaults to None. relevant_cols : list, optional List of relevant columns. Defaults to None. **kwargs : dict, optional Additional keyword arguments. time_units : list, optional List of time units to process start0101_freq : bool, optional For freq setting, if the start date is 01-01, set True offset_to_utc : bool, optional Whether to offset the time to UTC start_hour_in_a_day : int, optional The start hour in a day for sub-daily intervals (0-23). Default is 2.
Returns¶
dict A dictionary containing data with different time scales.
read_ts_xrdataset(gage_id_lst=None, t_range=None, var_lst=None, **kwargs)
¶
Read time-series xarray dataset from multiple NetCDF files and organize them by time units.
Parameters:¶
gage_id_lst: list List of gage IDs to select. t_range: list List of two elements [start_time, end_time] to select time range. var_lst: list List of variables to select. **kwargs Additional arguments.
Returns:¶
dict: A dictionary where each key is a time unit and each value is an xarray.Dataset containing the selected gage IDs, time range, and variables.
Processor¶
Basin Mean Rainfall¶
hydrodatasource.processor.basin_mean_rainfall.basin_mean_func(df, weights_dict=None)
¶
Generic basin averaging method that supports both arithmetic mean and weighted mean (e.g. Thiessen polygon weights)
When some columns have missing values in a row, the function automatically switches to arithmetic mean for that row instead of using weights. This ensures robustness when dealing with incomplete data.
Parameters¶
df : DataFrame Time series DataFrame for multiple stations, with station names as column names; each column should be a time series of rainfall data for a specific station weights_dict : dict, optional Dictionary with tuple of station names as keys and list of weights as values. If None, arithmetic mean is used.
the keys of list must be in the same order as the columns of df.
hence, an easy way is you give your df with a sorted column names and then use the same order to create the keys of weights_dict. for example: weights_dict = { ("st1", "st2", "st3", "st4"): [0.25, 0.5, 0.1, 0.15], } df = df[["st1", "st2", "st3", "st4"]] then the keys of weights_dict must be in the same order as the columns of df.
NOTE
we set the format of weights_dict like this because we want to extend it to match the weights_dict key based on the missing data situation and the key in weights_dict. This is a TODO item. if the key in weights_dict matches the columns of df, we use the weights in weights_dict; if the key in weights_dict does not match the columns of df, we use the arithmetic mean. For example, if the columns of df are ["st1", "st2", "st3", "st4"], and the weights_dict is: weights_dict = { ("st1", "st2", "st3", "st4"): [0.25, 0.5, 0.1, 0.15], ("st1", "st2", "st3"): [0.25, 0.5, 0.1], ("st3", "st4"): [0.1, 0.15], } then when st4 has missing data, we use the weights in ("st1", "st2", "st3") to calculate the weighted mean; and when st1 and st2 have missing data, we use the weights in ("st3", "st4") to calculate the weighted mean. Otherwise, we use the arithmetic mean.
But this function is not finished yet, and the weights_dict now only supports the case that the keys of weights_dict has all the columns of df; if any column in df is missing, the function will use the arithmetic mean.
Returns¶
Series Basin-averaged time series
Rainfall-Runoff Event Identification¶
hydrodatasource.processor.dmca_esr.get_rr_events(rain, flow, basin_area, max_window=100, max_flow_min=None)
¶
use DMCA-ESR method to identify rainfall-runoff events
Parameters¶
rain : xr.DataArray the rainfall data flow : xr.DataArray the streamflow data basin_area : xr.Dataset a dataset with a variable named area and for each basin max_window: int number of time intervals for find events; default 100 (for hourly) max_flow_min: list the minimum of max flow for each basin value below this will not be considered to look for an event; default 100 m^3/s Returns
dict the rainfall-runoff events for each basin
Raises¶
ValueError Invalid unit format ValueError Unsupported unit
Cleaner¶
RainfallCleaner¶
hydrodatasource.cleaner.rainfall_cleaner.RainfallCleaner
¶
Bases: Cleaner
__init__(data_folder, output_folder)
¶
data_check_hourly_extreme(basin_id, climate_extreme_value=None, modify=False)
¶
Check if the daily precipitation values at chosen stations are within a reasonable range. Values larger than the climate extreme value are treated as anomalies. If no climate_extreme_value is provided, the maximum value in the data is used.
Parameters¶
climate_extreme_value : float, optional Climate extreme threshold for the region, calculated as 95% of the maximum observed DRP. If not provided, will be calculated as 95% of the maximum DRP value in the data.
Returns¶
df_anomaly_stations_periods : pd.DataFrame DataFrame of anomalies with columns: 'STCD', 'TM', 'DRP'.
data_check_time_series(basin_id, check_type=None, gradient_limit=None, window_size=None, consistent_value=None, modify=False)
¶
Check daily precipitation values at chosen stations for gradient or time consistency anomalies.
Parameters¶
basin_id: str Basin ID. check_type : str Type of check to perform: "gradient" for gradient check, "consistency" for time consistency check. gradient_limit : float, optional Maximum allowable gradient change in precipitation between consecutive days. Used in "gradient" check. Default is 10 mm. window_size : int, optional Size of the window (in hours) to check for time consistency (used in "consistency" check). Default is 24 hours. consistent_value : float, optional The specific precipitation value to check for consistency (used in "consistency" check). Default is 0.1 mm.
Returns¶
pd.DataFrame DataFrame of detected anomalies with columns: 'STCD', 'TM', 'DRP', 'Issue' (where applicable).
data_check_yearly(basin_id, year_range=None, diff_range=None, min_true_percentage=0.75, min_consecutive_years=3, modify=False)
¶
计算遥感数据与站点数据之间的降水差异,评估站点可靠性,并返回可信任的站点列表。
参数:¶
basin_id : str Basin ID year_range : list, 可选 要筛选的年份范围,默认是 [2010, 2024]。 diff_range : list, 可选 站点数据和遥感数据之间的ratio差异范围 0.5 means station data is 0.5 times of reanalysis data 2.0 means station data is 2 times of reanalysis data min_true_percentage : float, 可选 要求可信年份的最小比例,默认 0.75。 min_consecutive_years : int, 可选 最小连续可信年份数,默认 3。
返回:¶
result_df : pd.DataFrame 可信站点的 DataFrame,包含 'STCD'、'Latitude'、'Longitude' 和 'Reason' 列。
rainfall_clean(basin_id, **kwargs)
¶
the station gauged rainfall data cleaning pipeline
read_and_concat_csv(basin_id)
¶
读取并合并文件夹下的所有 CSV 文件
ReservoirInflowBacktrack¶
hydrodatasource.cleaner.rsvr_inflow_cleaner.ReservoirInflowBacktrack
¶
Bases: Cleaner
__init__(data_folder, output_folder)
¶
Back-calculating inflow of reservior
Parameters¶
data_folder : str the folder of reservoir data output_folder : type where we put inflow data
back_calculation(rsvr_id, clean_w_path, original_file, output_folder)
¶
Back-calculate inflow from reservoir storage data NOTE: each time has three columns: I Q W -- I is the inflow, Q is the outflow, W is the reservoir storage Generally, in sql database, a time means the end of previous time period For example, a hourly database, 13:00 means 12:00-13:00 period because the data is GOT at 13:00 (we cannot observe future) Hence, for this function, W means the storage at the end of the time period, I and Q means the inflow and outflow of the time period So we need to use W of the previous time as the initial water storage of the time period. Hence, I1 = Q1 + (W1 - W0)
Parameters¶
rsvr_id : str The ID of the reservoir data_path : str the path to the cleaned_w_data file original_file: str the path to the original file output_folder : str where to save the back calculated data
Returns¶
str the path to the result file
clean_w(rsvr_id, file_path, output_folder, fit_method='quadratic', zw_curve_std_times=3.0, remove_zw_outliers=False)
¶
Remove abnormal reservoir capacity data
Parameters¶
rsvr_id : str The ID of the reservoir file_path : str Path to the input file output_folder : str Path to the output folder fit_method : str, optional z-w curve fitting method, by default "quadratic" TODO: MORE METHODS need to be supported; power is also need to be debugged zw_curve_std_times: float, optional the times of standard deviation to remove outliers, by default 3 remove_zw_outliers: bool, optional whether to remove outliers for z-w curve fitting, by default False
Returns¶
str Path to the cleaned data file
delete_negative_inq(rsvr_id, inflow_data_path, original_file, output_folder, negative_deal_window=7, negative_deal_stride=4)
¶
remove negative inflow values with a rolling window the negative value will be adjusted to positvie ones to make the total inflow consistent for example, 1, -1, 1, -1 will be adjusted to 0, 0, 0, 0 so that wate balance is kept but note that as the window has stride, maybe the final few values will not be adjusted
Parameters¶
rsvr_id : str the id of the reservoir inflow_data_path : str the data file after back_calculation original_file : str the original file output_folder : str where to save the data negative_deal_window : int, optional the window to deal with negative values, by default 7 negative_deal_stride : int, optional the stride of window, by default 4
Returns¶
str the path to the result file
insert_inq(rsvr_id, inflow_data_path, original_file, output_folder)
¶
make inflow data as hourly data as original data is not strictly hourly data and insert inq with linear interpolation
Parameters¶
rsvr_id : str the id of the reservoir inflow_data_path : str the data file after delete negative inflow values original_file : str the original file output_folder : str where to save the data
Returns¶
str the path to the result file
StreamflowCleaner¶
hydrodatasource.cleaner.streamflow_cleaner.StreamflowCleaner
¶
Bases: Cleaner
FFT(streamflow_data)
¶
对流量数据进行迭代的傅里叶滤波处理,包括非负值调整和流量总量调整。 :cutoff_frequency: 傅里叶滤波的截止频率。 :time_step: 数据采样间隔。 :iterations: 迭代次数。
data_balanced(origin_data, transform_data)
¶
对一维流量数据进行总量平衡变换。 :origin_data: 原始一维流量数据。 :transform_data: 平滑转换后的一维流量数据。
kalman_filter(streamflow_data)
¶
对流量数据应用卡尔曼滤波进行平滑处理,并保持流量总量平衡。 :param streamflow_data: 原始流量数据
lowpass_filter(streamflow_data)
¶
对一维流量数据应用调整后的低通滤波器。 :cutoff_frequency: 低通滤波器的截止频率。 :sampling_rate: 数据的采样率。 :order: 滤波器的阶数,默认为5。
moving_average(streamflow_data)
¶
对流量数据应用滑动平均进行平滑处理,并保持流量总量平衡。 :param streamflow_data: 输入的流量数据数组 :return: 平滑处理后的流量数据
moving_average_difference(streamflow_data)
¶
对流量数据应用滑动平均差算法进行平滑处理,并保持流量总量平衡。 :window_size: 滑动窗口的大小
robust_fitting(streamflow_data, k=1.5)
¶
对流量数据应用抗差修正算法进行平滑处理,并保持流量总量平衡。 默认采用二次曲线进行拟合优化,该算法处理性能较差
wavelet(streamflow_data)
¶
对一维流量数据进行小波变换分析前后拓展数据以减少边缘失真,然后调整总流量。 :cwt_row: 小波变换中使用的特定宽度。
WaterlevelCleaner¶
hydrodatasource.cleaner.waterlevel_cleaner.WaterlevelCleaner
¶
Bases: Cleaner