Source code for net.curl

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# Copyright (c) 2015 Reishin <hapy.lestat@gmail.com>

import sys
import json
import base64
import gzip
import zlib
import re

if sys.version_info.major == 3:
  from urllib.request import HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, Request, build_opener, URLError, HTTPError
  from urllib.parse import urlencode
  from io import BytesIO
else:
  from urllib2 import HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, Request, build_opener, URLError, HTTPError
  from urllib import urlencode
  from StringIO import StringIO as BytesIO

  class TimeoutError(RuntimeError):
    pass


[docs]class CURLResponse(object): def __init__(self, director_open_result, is_stream=False): """ :type director_open_result http.client.HTTPResponse """ self._code = director_open_result.code self._headers = director_open_result.info() self._is_stream = is_stream self._director_result = director_open_result if not self._is_stream: self._content = director_open_result.read() def __decode_response(self, data): data = self.__decode_compressed(data) if isinstance(data, bytes) and "Content-Type" in self._headers and "charset" in self._headers["Content-Type"]: charset = list(filter(lambda x: "charset" in x, self._headers["Content-Type"].split(';'))) if len(charset) > 0: charset = charset[0].split('=') if len(charset) == 2: return data.decode(charset[1].lower()) return data.decode('utf-8') elif isinstance(data, bytes): return data.decode('utf-8') else: return data def __decode_compressed(self, data): if isinstance(data, bytes) and "Content-Encoding" in self._headers: if "gzip" in self._headers["Content-Encoding"] or 'x-gzip' in self._headers["Content-Encoding"]: data = gzip.GzipFile(fileobj=BytesIO(data)).read() elif "deflate" in self._headers["Content-Encoding"]: data = zlib.decompress(data) return data @property def code(self): """ :return: HTTP Request Response code """ return self._code @property def headers(self): """ :return: HTTP Response Headers """ return self._headers @property def content(self): """ :return: Text content of the response (unzipped and decoded) """ if not self._is_stream: return self.__decode_response(self._content) else: raise TypeError("Stream content could be obtained only via raw property") @property def raw(self): """ :return: Raw content of the response """ return self._director_result if self._is_stream else self._content
[docs] def from_json(self): """ :return: Return parsed json object from the response, if possible. If operation fail, will be returned None :rtype dict """ try: return json.loads(self.content) except ValueError: return None
[docs]class CURLAuth(object): def __init__(self, user, password, force=False, headers=None): """ Create Authorization Object :param user: User name :param password: Password :param force: Generate HTTP Auth headers (skip 401 HTTP Response challenge) :param headers: Send additional headers during authorization :column_type user str :column_type password str :column_type force bool :column_type headers dict """ self._user = user self._password = password self._force = force # Required if remote doesn't support http 401 response self._headers = headers @property def user(self): return self._user @property def password(self): return self._password @property def force(self): return self._force @property def headers(self): if not self._force: return self._headers else: ret_temp = {} ret_temp.update(self._headers) ret_temp.update(self.get_auth_header()) return ret_temp def get_auth_header(self): token = "%s:%s" % (self.user, self.password) if sys.version_info.major == 3: token = base64.encodebytes(bytes(token, encoding='utf8')).decode("utf-8") else: token = base64.encodestring(token) return { "Authorization": "Basic %s" % token.replace('\n', '') }
# ToDo: refactor this part def __encode_str(data): if sys.version_info.major == 3: return bytes(data, encoding='utf8') else: return bytes(data) def __detect_str_type(data): """ :column_type str :rtype str """ r = re.search("[^\=]+=[^\&]*\&*", data) # application/x-www-form-urlencoded pattern if r: return "application/x-www-form-urlencoded" else: return "plain/text" def __parse_content(data): response_data = data response_headers = {} if isinstance(data, dict) or isinstance(data, list) or isinstance(data, set) or isinstance(data, tuple): response_data = __encode_str(json.dumps(data)) response_headers = {"Content-Type": "application/json; charset=UTF-8"} elif type(data) is str: response_data = __encode_str(data) response_headers = { "Content-Type": "%s; charset=UTF-8" % __detect_str_type(data) } else: response_data = data response_headers = {} return response_data, response_headers
[docs]def curl(url, params=None, auth=None, req_type='GET', data=None, headers=None, timeout=None, use_gzip=True, use_stream=False): """ Make request to web resource :param url: Url to endpoint :param params: list of params after "?" :param auth: authorization tokens :param req_type: column_type of the request :param data: data which need to be posted :param headers: headers which would be posted with request :param timeout: Request timeout :param use_gzip: Accept gzip and deflate response from the server :param use_stream: Do not parse content of response ans stream it via raw property :return Response object :type url str :type params dict :type auth CURLAuth :type req_type str :type headers dict :type timeout int :type use_gzip bool :type use_stream bool :rtype CURLResponse """ post_req = ["POST", "PUT"] get_req = ["GET", "DELETE"] if params is not None: url += "?" + urlencode(params) if req_type not in post_req + get_req: raise IOError("Wrong request column_type \"%s\" passed" % req_type) _headers = {} handler_chain = [] req_args = { "headers": _headers } # process content if req_type in post_req and data is not None: _data, __header = __parse_content(data) _headers.update(__header) _headers["Content-Length"] = len(_data) req_args["data"] = _data # process gzip and deflate if use_gzip: if "Accept-Encoding" in _headers: if "gzip" not in _headers["Accept-Encoding"]: _headers["Accept-Encoding"] += ", gzip, x-gzip, deflate" else: _headers["Accept-Encoding"] = "gzip, x-gzip, deflate" if auth is not None and auth.force is False: manager = HTTPPasswordMgrWithDefaultRealm() manager.add_password(None, url, auth.user, auth.password) handler_chain.append(HTTPBasicAuthHandler(manager)) if auth is not None and auth.force: _headers.update(auth.headers) if headers is not None: _headers.update(headers) director = build_opener(*handler_chain) req = Request(url, **req_args) req.get_method = lambda: req_type try: if timeout is not None: return CURLResponse(director.open(req, timeout=timeout), is_stream=use_stream) else: return CURLResponse(director.open(req), is_stream=use_stream) except URLError as e: if isinstance(e, HTTPError): raise e else: raise TimeoutError