Dai 数据管理 API
由small_q创建,最终由small_q 被浏览 6 用户
1 模块介绍
本模块提供数据查询、数据源读写、自定义函数等功能,支持远程查询和 BDB 数据管理
2 API 介绍
2.1 自定义 UDF 函数
类名: dai.DaiUDF
功能: 自定义 UDF 函数定义
参数:
name: str,必填,UDF 函数名称function: Callable,必填,Python 函数对象parameters: Optional[List],非必填,参数类型列表,默认为 Nonereturn_type: Optional[Any],非必填,返回值类型,默认为 Nonetype: Optional[Any],非必填,UDF 类型,默认为 Nonenull_handling: Optional[Any],非必填,NULL 值处理方式,默认为 Noneexception_handling: Optional[Any],非必填,异常处理方式,默认为 Noneside_effects: bool,非必填,是否有副作用,默认为 False
说明: 用于在 dai.query() 中传递自定义函数,SDK 会自动序列化函数代码并在云端执行
2.2 远程查询数据
方法名:dai.query()\n功能:远程查询\n参数:
sql: str,必填,mysql 查询语句udf_list:List[DaiUDF],非必填,UDF 函数列表,默认为 [ ]full_db_scan: bool,非必填, 是否允许全表查询,默认为 Falsefilters:Dict[str, List[Any]],非必填,过滤条件 {"column": ["value1", "value2"]},默认为 {}bind_relations:Dict[str, Any],非必填,绑定本地数据到 SQL 查询 {"name": DataFrame/Table},默认为 Noneparams:Dict[str, Any],非必填,查询参数 {"param_name": "value"},默认为 Nonecompression:bool,非必填,是否启用字符串压缩,默认为 Falseresource_spec_id:str,指定 AIStudio 资源规格 ID(SDK 专属),默认为 D0(1C/6G) 免费space_id:str,指定 AIStudio 空间 ID(SDK 专属),默认为 主空间
返回:QueryResult 对象
2.3 数据源管理
类名:dai.DataSource
功能:数据源管理
参数:
datasource_id: str,必填,数据源 ID
方法:见下
2.3.1 读取 BDB 数据
方法名: dai.DataSource.read_bdb()
功能: 读取 BDB 数据
参数:
as_type: Type,非必填,返回类型,默认为 pa.Table,支持 pd.DataFrame, pa.Tablepartition_filter: Optional[Dict[str, Union[tuple, set]]],非必填,分区过滤条件,默认为 Nonetuple: 表示范围,如 ("2024-01-01", "2024-12-31")set: 表示特定值,如 {"000001.SZ", "600000.SH"}
columns: Optional[List[str]],非必填,要读取的列名列表,默认为 None(读取所有列)
返回: 根据 as_type 参数返回 pd.DataFrame 或 pa.Table
2.3.2 写入 BDB 数据
方法名:dai.DataSource.write_bdb()
功能: 写入 BDB 数据
参数:
• data : Union[pd.DataFrame, pa.Table],必填,要写入的数据
• update_logs : Optional[Union[bool, Dict]],非必填,是否记录更新日志,默认为 None
• update_msg : Optional[str],非必填,更新备注信息,默认为 None
• id : Optional[str],非必填,数据源 ID,默认为 None(创建临时数据源)
• partitioning : Optional[List[str]],非必填,分区列,默认为 None
• indexes : Optional[List[str]],非必填,索引列,默认为 None
• excludes : Optional[Set[str]],非必填,排除字段,默认为 None
• unique_together : Optional[List[str]],非必填,唯一约束,默认为 None
• on_duplicates : str,非必填,冲突处理策略,默认为 "last",可选值:["last", "first", "error", "none"]
• sort_by : Optional[List[Tuple[str, str]]],非必填,排序,格式为 [("field", "ascending/descending"), ...],默认为 None
• preserve_pandas_index : bool,非必填,是否保留 pandas 索引,默认为 False
• docs : Optional[Dict[str, Any]],非必填,文档,默认为 None
• timeout : int,非必填,写入锁的超时时间(秒),默认为 300
• extra : str,非必填,额外信息,默认为 ""
• base_ds : Optional[DataSource],非必填,继承 extra 参数,默认为 None
• overwrite : bool,非必填,是否覆盖已有数据,默认为 False
• max_threads : Optional[int],非必填,最大线程数,默认为 None
• preserve_order : bool,非必填,是否保持顺序,默认为 False
返回:DataSource 对象
2.4 查询结果
类名:dai.QueryResult
功能:查询结果包装类,提供多种数据格式转换方法
方法:见下
2.4.1 获取 Arrow Table
方法名:dai.QueryResult.arrow()
功能:返回 Arrow Table 格式的查询结果
参数:无
返回:pa.Table 对象
2.4.2 转换为 pandas DataFrame
方法名:dai.QueryResult.df()
功能:转换为 pandas DataFrame
参数:无
返回:pd.DataFrame 对象
2.4.3 转换为 Polars DataFrame
方法名:dai.QueryResult.pl()
功能:转换为 Polars DataFrame
参数:无
返回:polars.DataFrame 对象
2.4.4 获取所有行
方法名:dai.QueryResult.fetchall()
功能:获取所有行为列表
参数:无
返回:list,所有行的列表
2.4.5 获取流式读取器
方法名:dai.QueryResult.fetch_arrow_reader()
功能:获取 Arrow 流式读取器,用于大数据场景分批读取
参数:
batch_size: int,非必填,每批数据的行数,默认为 1000000
返回:pyarrow.RecordBatchReader 对象