Metadata-Version: 2.1
Name: py-bidata-util
Version: 0.1.0
Summary: Bigdata Utility Code.
Home-page: https://gitee.com/albin3/py-bigdata-util
Author: Albin
Author-email: binwei.zeng3@gmail.com
Maintainer: albin3
Maintainer-email: binwei.zeng3@gmail.com
Platform: all
Description-Content-Type: text/markdown

# Bidata Utility


## Install

```shell
pip install py-bigdate-util --upgrade
```

## Usage

### Config
> 读本地配置文件load配置

```py

#!/usr/bin/env python3
# -*- coding=utf-8 -*-
import unittest

from bigdata_util.util import get_absolute_path
from bigdata_util.util import BaseConfig, Singleton

'''
# config.test_conf
a = a
b = 1
c = c
'''

class MyConfig(BaseConfig, metaclass=Singleton):
    def __init__(self, cfg: dict = None):
        """
        :type cfg: object
        """
        super(MyConfig, self).__init__(
            cfg,
            # config.conf 符合 ConfigObj 格式
            # https://configobj.readthedocs.io/en/latest/configobj.html#the-config-file-format
            get_absolute_path(__file__, '../config.test_conf')
        )
        pass


class ConfigTest(unittest.TestCase):

    def test_config(self):
        config = {
            'aa': 'aa',
            'bb': 2,
            'c': 'cc'
        }
        c = MyConfig(cfg=config)
        self.assertEqual(c.get('a', 'default'), 'a')
        self.assertEqual(c.get('aa', 'default'), 'aa')
        self.assertEqual(c.get('b', 'default'), 1)
        self.assertEqual(c.get('bb', 'default'), 2)
        self.assertEqual(c.get('c', 'default'), 'cc')
        pass

```

### Connectors

MaxcomputeConnector功能点

- odps_ins = MaxcomputeConnector(cfg.get('connector.odps')) 连接odps，可配置专网logview地址，同时支持http、socks代理配置
- odps_ins.get_table_data 拉取表数据，并缓存在本目录 .connector_maxcompute_cache 中，监控last_modified_time决定是否更新
- odps_ins.update_mr_jar 自动从本地java工程目录拿到jar包并上传到odps作为资源，返回资源名
- odps_ins.update_udf 更新udf
- odps_ins.run_sql_with_logview_return_plain_json 运行sql，打印logview，并返回结果
- odps_ins.run_sql_in_file 运行.sql文件，支持文件中传入 ${变量} 或 ${类实例.属性}
- odps_ins.create_ots_external_table 创建ots外表

odps和postgres配合使用
```py
from bigdata_util.connector import MaxcomputeConnector, PostGreConnector
from .my_config import MyConfig

cfg = MyConfig()
odps = MaxcomputeConnector(cfg.get('connector.odps'))
postgre = PostGreConnector(cfg.get('connector.postgres'))

# fetch and cache data.
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
# 多次次读数据时，基于远程表的last_modified_time决定是否重新拉取
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
postgre.save_geometry_table('dual_point', data_list, col_name_wkt='wkt', geometry_type='point')
```

ots使用

```py
from bigdata_util.connector import TableStoreConnector

ots_cli = TableStoreConnector(cfg.get('connector.ots'))
consumed, return_row, next_token = ots_cli.get_row('dual', [('id', 1)])
```

### 运行java mr

已在java代码中写好mr逻辑，然后在python中调用

define
```py
#!/usr/bin/env python3
# -*- coding=utf-8 -*-

from bigdata_util.util import get_logger, get_absolute_path
from bigdata_util.connector import MaxComputeConnector
from .my_config import CustomConfig
from bigdata_util.mr_launcher import MapReduceLauncher
from typing import List


logger = get_logger(__file__)
cfg = CustomConfig()
odps_cst = MaxComputeConnector(cfg.get('connector.odps_cst'))


class CalculateDistanceOfPools(MapReduceLauncher):

    def init_mr_jar_base_path(self, mr_jar_base_path: str = None):
        return cfg.get('mr_jar_base_path')

    def init_project_base_path(self, mr_jar_base_path: str = None):
        return cfg.get('project_base_path')

    @staticmethod
    def init_odps_conf_file_name():
        return 'odps_cst_conf.ini'

    def init_mapper_class(self, mapper_class: str = None):
        return 'com.aliyun.citybrain.traffic.commonmapper.RawMapper'

    def init_reducer_class(self, reducer_class: str = None):
        return 'com.aliyun.citybrain.traffic.pooldistance.Reducer'

    def init_maxcompute_ins(self, maxcompute_ins: MaxComputeConnector = None):
        return odps_cst

    def init_mr_jar_name(self, mr_jar_name: str = None):
        return 'traffic-algo-mg-1.0-SNAPSHOT-jar-with-dependencies.jar'


    def init_system_parameters(self, system_parameters: dict = None):
        """
        只控制 下面这三个参数，默认值如下
          -splitSize 32
          -reduceCnt 100
          -reduceMem 4096
        :param system_parameters: {'splitSize': '16', 'resuceCnt': '900', 'reduceMem': 4096}
        :return: 可以为空
        """
        return {
            'reduceCnt': 900
        }

    def init_ddl_file_path(self, ddl_file_path: str = None):
        return os.path.abspath(os.path.join(
            os.path.split(os.path.realpath(__file__))[0],
            'step_3_pool_clustering_euclidean_distance_between_pool_mr.osql'
        ))

    def run(self):
        self.launch()


if __name__ == '__main__':
    CalculateDistanceOfPools().run()


```

use

```py
        from . import CalculateDistanceOfPools
        mr = CalculateDistanceOfPools()
        mr.set_init_parameters({
            'mapper.key': 'pool_id:string',
            'mapper.value': 'pool_id:string,sub_polygon:string',
            'geometry_col_name': 'sub_polygon',
            'geometry_gen_type': 'wkt',
            'input_table': TableInfo('algtmp_tfc_vhcpool_info', partition=cfg.COMMON_PARTITION_WITH_DT),
            'output_table': TableInfo('algtmp_rltn_euclidean_distance_pool_pool', partition=cfg.COMMON_PARTITION_WITH_DT),
            'overlap_length': 1,       # 至少50米交接
            'overlap_ratio': 0.1,
            'threshold.MAX_DISTANCE': 0
        })
        mr.launch()

# 打印mr运行的logview...
```

### Plot

```py

from . import MyConfig
from bigdata_util.plot import PlotLine
from bigdata_util.connector import MaxcomputeConnector

cfg = MyConfig()
plot_line = PlotLine(MaxcomputeConnector(cfg.get('connector.odps')))
plot_line.plot_line('''
    select
      stat_time x,
      value y,
      name label
    from dual
''')
```

![fig](resource/figure.png)

## connectors依赖

### odps

```sh
pip install pyodps==0.9.1
```

### hive

```sh
pip install impyla==0.16.3
pip install thrift_sasl==0.4.2 (mac安装请参考: https://github.com/albin3/book-notes/issues/2)
```

### hive kerberos

安装依赖：

mac: `brew install krb5`
linux: `apt install -y krb5-user`

```
pip install bit_array
pip install thrift
pip install thrift_sasl
pip install impyla
pip install krbcontext
pip install hdfs[kerberos] -i https://mirrors.aliyun.com/pypi/simple
pip install pykerberos -i https://mirrors.aliyun.com/pypi/simple
```

### kafka

```sh
pip install kafka-python==2.0.1
```

## pg with proxy

```sh
pip install asyncpg
```

usage:

```python
from bigdata_util.connector.postgre_async import PostgreAsyncConnector

table_name = 'algtmp_connector_test_pg'
pg_conn = PostgreAsyncConnector(
    'postgresql://<user_name>:<passwd>@<host_name>/<data_base>',
    'socks5://<host_name>:<port>'
)
pg_conn.execute_sql(f'''
    create table if not exists {table_name} (
        a varchar,
        b varchar,
        c varchar,
        d varchar,
        PRIMARY KEY (a, b, c, d)
    )
''')
pg_conn.save_data(table_name, [
    {'a': 'a1', 'b': 'b1', 'c': 'c2', 'd': 'd1'}
])
data_list = pg_conn.run_sql_return_plain_json(f'''
    select * from {table_name}
''')
pg_conn.drop_table(table_name)
```

## mysql with proxy

> 暂时只支持查询，不支持写数据

```sh
pip install PyMySQL
```

usage:

```python
from bigdata_util.connector.mysql import MysqlConnector

mysql_conn = MysqlConnector(
    host='localhost',
    port=3306,
    user='root',
    password='password',
    database='default',
    proxy='socks5://<host_name>:<port>',
)
data_list = mysql_conn.run_sql_return_plain_json('''
    show tables;
''')
```

## install in p36

安装前可能需要先手动安装 numpy pandas shapely

```sh
> pip install numpy pandas shapely
> pip install py-bigdate-util
```
