Source code for tsbenchmark.api

import json
import os
import time

import pandas as pd
from typing import Dict

from tsbenchmark.tasks import TSTask
from tsbenchmark.consts import DEFAULT_REPORT_METRICS, DEFAULT_GLOBAL_RANDOM_STATE, TASK_MODE_LOCAL, SUB_RESULT_MAX
from tsbenchmark import tasks


# __all__ = ['get_task', 'get_local_task', 'send_report_data']


[docs]def get_task() -> TSTask: """Get a TsTask from benchmark server. TsTask is a unit task, which help Player get the data and metadata. It will get TsTaskConfig from benchmark server and construct it to TSTask. Call TSTask.ready() method init start time and load data. See Also: TSTask : Player will get the data and metadata from the TSTask then run algorithm for compete. Notes: 1. You can get attributes description from TSTask. 2. In the report it support 'smape', 'mape', 'mae' and 'rmse'. Returns: TSTask, The TsTask for player get the data and metadata. """ from hypernets.hyperctl import api as hyperctl_api from tsbenchmark.players import JobParams hyperctl_job_params = hyperctl_api.get_job_params() job_params = JobParams(**hyperctl_job_params) task_config = tasks.get_task_config(job_params.task_config_id, cache_path=job_params.dataset_cache_path) t = TSTask(task_config=task_config, random_state=job_params.random_state, max_trials=job_params.max_trials, reward_metric=job_params.reward_metric) t.ready() return t
[docs]def get_local_task(data_path, dataset_id='512754', random_state=DEFAULT_GLOBAL_RANDOM_STATE, max_trials=3, reward_metric='smape') -> TSTask: """Get a TsTask from local for develop a new player and test. TsTask is a unit task, which help Player get the data and metadata. It will get a TsTaskConfig locally and construct it to TSTask. Call TSTask.ready() method init start time and load data. Args: data_path : str, default='~/tmp/data_cache'. The path locally to cache data. TSLoader will download data and cache it in data_path. dataset_id : str, default='512754'. The unique id for a dataset task. You can get it from tests/dataset_desc.csv. random_state : int, consts.GLOBAL_RANDOM_STATE. Determines random number for automl framework. max_trials : int, default=3. Maximum number of tests for automl framework, optional. reward_metric : str, default='smape'. The optimize direction for model selection. Hypernets search reward metric name or callable. Possible values: 'accuracy', 'auc', 'mse', 'mae','rmse', 'mape', 'smape', and 'msle'. Notes: 1. You can get attributes description from TSTask. 2. In the report it support 'smape', 'mape', 'mae' and 'rmse'. See Also: TSTask: Player will get the data and metadata from the TSTask then run algorithm for compete. Returns: TSTask, The TsTask for player get the data and metadata. """ from tsbenchmark.tsloader import TSTaskLoader from tsbenchmark.tasks import TSTask data_path = data_path task_loader = TSTaskLoader(data_path) task_config = task_loader.load(dataset_id) task = TSTask(task_config, random_state=random_state, max_trials=max_trials, reward_metric=reward_metric) task.ready() setattr(task, TASK_MODE_LOCAL, True) return task
[docs]def report_task(report_data: Dict, bm_task_id=None, api_server_uri=None): """Report metrics or running information to api server. Args: report_data: Dict. The report data generate by send_report_data. bm_task_id: str, optional, BenchmarkTask id, if is None will get from current job api_server_uri: str, optional, tsbenchmark api server uri, if is None will get from environment or use default value """ bm_task_id = _get_bm_task_id(bm_task_id) assert bm_task_id api_server_uri = _get_api_server_api(api_server_uri) assert api_server_uri report_url = f"{api_server_uri}/tsbenchmark/api/benchmark-task/{bm_task_id}/report" request_dict = { 'data': report_data } from hypernets.hyperctl import utils utils.post_request(report_url, json.dumps(request_dict))
[docs]def send_report_data(task: TSTask, y_pred: pd.DataFrame, key_params='', best_params='', sub_result=False): """Send report data. This interface used for send report data to benchmark server. 1. Prepare the data which can be call be tsb.api.report_task. 2. Call method report_task, send the report data to the Benchmark Server. Args: y_pred: pandas.DataFrame, The predicted values by the players. It should be a pandas.DataFrame, and it must have the headers name, which you can get from task.series_name. key_params: str, default='' The params which user want to save to the report datas. best_params: str, default='' The best model's params, for automl, there are many models will be trained. If user want to save the best params, user may assign the best_params. Notes: When develop a new play locally, this method will help user validate the predicted and params. """ task._end_time = time.time() default_metrics = DEFAULT_REPORT_METRICS target_metrics = default_metrics assert y_pred is not None assert isinstance(y_pred, pd.DataFrame) if y_pred.shape[0] != task.get_test().shape[0]: raise Exception( "The result should have {} rows but got {}. ".format(task.get_test().shape[0], y_pred.shape[0])) if not all([key in y_pred.columns for key in task.series_name]): raise Exception(f"Series names does not exists in the columns of predict result." f" You can get it from task.series_name.\n The Series names are {task.series_name} ") from tsbenchmark.util import cal_task_metrics task_metrics = cal_task_metrics(y_pred, task.get_test()[task.series_name], task.date_name, task.series_name, task.covariables_name, target_metrics, 'regression') report_data = { 'duration': time.time() - task.start_time - task.download_time, 'y_predict': y_pred[task.series_name].to_json(orient='records')[1:-1].replace('},{', '} {'), 'y_real': task.get_test()[task.series_name].to_json(orient='records')[1:-1].replace('},{', '} {'), 'metrics': task_metrics, 'key_params': key_params, 'best_params': best_params, 'sub_result': str(sub_result) } if not hasattr(task, TASK_MODE_LOCAL): report_task(report_data) else: from hypernets.utils import logging as hyn_logging hyn_logging.set_level(hyn_logging.DEBUG) logger = hyn_logging.get_logger(__name__) logger.info("Successfully validation for local test mode.") if sub_result: if not hasattr(task, "sub_result_count"): setattr(task, "sub_result_count", 1) else: task.sub_result_count = task.sub_result_count + 1 if task.sub_result_count >= SUB_RESULT_MAX: logger.info( f"Exit with {task.sub_result_count} sub_result have been sended. The maximum counts of sub_result is {SUB_RESULT_MAX}.") os._exit(0) else: logger.info(f"Exit with sub_result = {sub_result}, none sub_result support only 1s result.") os._exit(0)
def _get_api_server_api(api_server_uri=None): if api_server_uri is None: from hypernets.hyperctl import consts api_server_portal = os.getenv(consts.KEY_ENV_SERVER_PORTAL) assert api_server_portal return api_server_portal else: return api_server_uri def _get_bm_task_id(bm_task_id): if bm_task_id is None: from hypernets.hyperctl import api as hyperctl_api from tsbenchmark.players import JobParams hyperctl_job_params = hyperctl_api.get_job_params() job_params = JobParams(**hyperctl_job_params) return job_params.bm_task_id else: return bm_task_id