源码网商城,靠谱的源码在线交易网站 我的订单 购物车 帮助

源码网商城

python实现rest请求api示例

  • 时间:2021-04-24 12:53 编辑: 来源: 阅读:
  • 扫一扫,手机访问
摘要:python实现rest请求api示例
该代码参考新浪python api,适用于各个开源api请求
[u]复制代码[/u] 代码如下:
# -*- coding: utf-8 -*- import collections import gzip import urllib import urllib2 from urlparse import urlparse try:     from cStringIO import StringIO except ImportError:     from StringIO import StringIO try:     import json except ImportError:     import simplejson as json __author__ = 'myth' _HTTP_GET = 1 _HTTP_POST = 2 # 超时时间(秒) TIMEOUT = 45 RETURN_TYPE = {"json": 0, "xml": 1, "html": 2, "text": 3} _METHOD_MAP = {'GET': _HTTP_GET, 'POST': _HTTP_POST} class APIError(StandardError):     """     raise APIError if receiving json message indicating failure.     """     def __init__(self, error_code, error, request):         self.error_code = error_code         self.error = error         self.request = request         StandardError.__init__(self, error)     def __str__(self):         return 'APIError: %s: %s, request: %s' % (self.error_code, self.error, self.request) # def callback_type(return_type='json'): # #     default_type = "json" #     default_value = RETURN_TYPE.get(default_type) #     if return_type: #         if isinstance(return_type, (str, unicode)): #             default_value = RETURN_TYPE.get(return_type.lower(), default_value) #     return default_value def _format_params(_pk=None, encode=None, **kw):     """     :param kw:     :type kw:     :param encode:     :type encode:     :return:     :rtype:     """     __pk = '%s[%%s]' % _pk if _pk else '%s'     encode = encode if encode else urllib.quote     for k, v in kw.iteritems():         _k = __pk % k         if isinstance(v, basestring):             qv = v.encode('utf-8') if isinstance(v, unicode) else v             _v = encode(qv)             yield _k, _v         elif isinstance(v, collections.Iterable):             if isinstance(v, dict):                 for _ck, _cv in _format_params(_pk=_k, encode=encode, **v):                     yield _ck, _cv             else:                 for i in v:                     qv = i.encode('utf-8') if isinstance(i, unicode) else str(i)                     _v = encode(qv)                     yield _k, _v         else:             qv = str(v)             _v = encode(qv)             yield _k, _v def encode_params(**kw):     """     do url-encode parameters     >>> encode_params(a=1, b='R&D')     'a=1&b=R%26D'     >>> encode_params(a=u'\u4e2d\u6587', b=['A', 'B', 123])     'a=%E4%B8%AD%E6%96%87&b=A&b=B&b=123'     >>> encode_params(**{         'a1': {'aa1': 1, 'aa2': {'aaa1': 11}},         'b1': [1, 2, 3, 4],         'c1': {'cc1': 'C', 'cc2': ['Q', 1, '@'], 'cc3': {'ccc1': ['s', 2]}}     })     'a1[aa1]=1&a1[aa2][aaa1]=11&c1[cc1]=C&c1[cc3][ccc1]=s&c1[cc3][ccc1]=2&c1[cc2]=Q&c1[cc2]=1&c1[cc2]=%40&b1=1&b1=2&b1=3&b1=4'     """     # args = []     # for k, v in kw.iteritems():     #     if isinstance(v, basestring):     #         qv = v.encode('utf-8') if isinstance(v, unicode) else v     #         args.append('%s=%s' % (k, urllib.quote(qv)))     #     elif isinstance(v, collections.Iterable):     #         for i in v:     #             qv = i.encode('utf-8') if isinstance(i, unicode) else str(i)     #             args.append('%s=%s' % (k, urllib.quote(qv)))     #     else:     #         qv = str(v)     #         args.append('%s=%s' % (k, urllib.quote(qv)))     # return '&'.join(args)     args = []     _urlencode = kw.pop('_urlencode', urllib.quote)     for k, v in _format_params(_pk=None, encode=_urlencode, **kw):         args.append('%s=%s' % (k, v))     args = sorted(args, key=lambda s: s.split("=")[0])     return '&'.join(args) def _read_body(obj):     using_gzip = obj.headers.get('Content-Encoding', '') == 'gzip'     body = obj.read()     if using_gzip:         gzipper = gzip.GzipFile(fileobj=StringIO(body))         fcontent = gzipper.read()         gzipper.close()         return fcontent     return body class JsonDict(dict):     """     general json object that allows attributes to be bound to and also behaves like a dict     """     def __getattr__(self, attr):         try:             return self[attr]         except KeyError:             raise AttributeError(r"'JsonDict' object has no attribute '%s'" % attr)     def __setattr__(self, attr, value):         self[attr] = value def _parse_json(s):     """     parse str into JsonDict     """     def _obj_hook(pairs):         """         convert json object to python object         """         o = JsonDict()         for k, v in pairs.iteritems():             o[str(k)] = v         return o     return json.loads(s, object_hook=_obj_hook) def _parse_xml(s):     """     parse str into xml     """     raise NotImplementedError() def _parse_html(s):     """     parse str into html     """     raise NotImplementedError() def _parse_text(s):     """     parse str into text     """     raise NotImplementedError() def _http_call(the_url, method, return_type="json", request_type=None, request_suffix=None, headers={}, _timeout=30, **kwargs):     """     the_url: 请求地址     method 请求方法(get,post)     return_type: 返回格式解析     request_suffix: 请求地址的后缀,如jsp,net     _timeout: 超时时间     kwargs: 请求参数     """     http_url = "%s.%s" (the_url, request_suffix) if request_suffix else the_url     if request_type == 'json':         headers['Content-Type'] = 'application/json'         # method = _HTTP_POST         # json_data = json.dumps(kwargs)         # # convert str to bytes (ensure encoding is OK)         # params = json_data.encode('utf-8')         params = json.dumps(kwargs)     else:         params = encode_params(**kwargs)         http_url = '%s?%s' % (http_url, params) if method == _HTTP_GET else http_url     print http_url     # u = urlparse(http_url)     # headers.setdefault("host", u.hostname)     http_body = None if method == _HTTP_GET else params     req = urllib2.Request(http_url, data=http_body, headers=headers)     callback = globals().get('_parse_{0}'.format(return_type))     if not hasattr(callback, '__call__'):         print "return '%s' unable to resolve" % return_type         callback = _parse_json     try:         resp = urllib2.urlopen(req, timeout=_timeout if _timeout else TIMEOUT)         body = _read_body(resp)         r = callback(body)         # if hasattr(r, 'error_code'):         #     raise APIError(r.error_code, r.get('error', ''), r.get('request', ''))         return r     except urllib2.HTTPError, e:         try:             body = _read_body(e)             r = callback(body)             return r         except:             r = None         # if hasattr(r, 'error_code'):         #     raise APIError(r.error_code, r.get('error', ''), r.get('request', ''))             raise e class HttpObject(object):     def __init__(self, client, method):         self.client = client         self.method = method     def __getattr__(self, attr):         def wrap(**kw):             if attr:                 the_url = '%s/%s' % (self.client.api_url, attr.replace('__', '/'))             else:                 the_url = self.client.api_url             return _http_call(the_url, self.method, **kw)         return wrap     def __call__(self, **kw):         return _http_call(self.client.api_url, self.method, **kw) class APIClient(object):     """     使用方法:         比如:api 请求地址为:http://api.open.zbjdev.com/kuaiyinserv/kuaiyin/billaddress              请求方式为: GET              需要的参数为:user_id 用户的UID                          is_all 是否查询所有数据,0为默认邮寄地址 1为全部邮寄地址                          access_token 平台认证              返回数据为:json         那么此时使用如下:         domain = "api.open.zbjdev.com"         #如果是https请求,需要将is_https设置为True         client = APIClient(domain)         data = {"user_id": "14035462", "is_all": 1, "access_token": "XXXXXXXXXX"}         # 如果是post请求,请将get方法改为post方法         result = client.kuaiyinserv.kuaiyin.billaddress.get(return_type="json", **data)         #等同于         # result = client.kuaiyinserv__kuaiyin__billaddress__get(return_type="json", **data)         # result = client.kuaiyinserv__kuaiyin__billaddress(return_type="json", **data)     """     def __init__(self, domain, is_https=False):         http = "http"         if domain.startswith("http://") or domain.startswith("https://"):             http, domain = domain.split("://")         # else:         if is_https:             http = "https"         self.api_url = ('%s://%s' % (http, domain)).rstrip("/")         self.get = HttpObject(self, _HTTP_GET)         self.post = HttpObject(self, _HTTP_POST)     def __getattr__(self, attr):         if '__' in attr:             method = self.get             if attr[-6:] == "__post":                 attr = attr[:-6]                 method = self.post             elif attr[-5:] == "__get":                 attr = attr[:-5]             if attr[:2] == '__':                 attr = attr[2:]             return getattr(method, attr)         return _Callable(self, attr) class _Executable(object):     def __init__(self, client, method, path):         self._client = client         self._method = method         self._path = path     def __call__(self, **kw):         method = _METHOD_MAP[self._method]         return _http_call('%s/%s' % (self._client.api_url, self._path), method, **kw)     def __str__(self):         return '_Executable (%s %s)' % (self._method, self._path)     __repr__ = __str__ class _Callable(object):     def __init__(self, client, name):         self._client = client         self._name = name     def __getattr__(self, attr):         if attr == 'get':             return _Executable(self._client, 'GET', self._name)         if attr == 'post':             return _Executable(self._client, 'POST', self._name)         name = '%s/%s' % (self._name, attr)         return _Callable(self._client, name)     def __str__(self):         return '_Callable (%s)' % self._name     __repr__ = __str__ def test_logistics():     domain = "https://api.kuaidi100.com/api"     #如果是https请求,需要将is_https设置为True     client = APIClient(domain)     data = {"id": "45f2d1f2sds", "com": "yunda", "nu": "1500066330925"}     result = client.__get(_timeout=2, **data)     print result     print result["message"]     print result.get("message")     print result.message if __name__ == '__main__':     # test_logistics()     data = {         "data":{             "id": 1,             "pid": 3,             "name": u'中的阿斯顿'         },         "sign": 'asdfdsdsfsdf',     }     domain = "kuaiyin.zhubajie.com"     client = APIClient(domain)     # headers = {'Content-Type': 'application/json'}     headers = {"host": domain}     result = client.api.zhubajie.fz.info.post(return_type="json", request_type="json", headers=headers, **data)     print result     print result['data']['msg']     c = APIClient('task.6.zbj.cn')     r = getattr(c.api, 'kuaiyin-action-delcache').post(request_type="json",                                                        headers={},                                                        **{"sign": "",                                                           "data": {                                                               "product": "pvc",                                                               "category": "card",                                                               "req_type": "pack_list"                                                           }})     print r     print r['data']['msg']
  • 全部评论(0)
联系客服
客服电话:
400-000-3129
微信版

扫一扫进微信版
返回顶部