Source code for pyfarm.agent.http.core.client

# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
HTTP Client
===========

The client library the manager uses to communicate with
the master server.
"""

import os
import json
from collections import namedtuple
from functools import partial
from random import random
from urlparse import urlparse
from uuid import UUID, uuid4

try:
    from httplib import (
        responses, INTERNAL_SERVER_ERROR, BAD_REQUEST, MULTIPLE_CHOICES
    )

except ImportError:  # pragma: no cover
    from http.client import (
        responses, INTERNAL_SERVER_ERROR, BAD_REQUEST, MULTIPLE_CHOICES
    )

try:
    from UserDict import UserDict
except ImportError:  # pragma: no cover
    from collections import UserDict

try:
    import ssl
except ImportError:  # pragma: no cover
    ssl = NotImplemented

try:
    import PyOpenSSL
except ImportError:  # pragma: no cover
    PyOpenSSL = NotImplemented


try:
    import service_identity
except ImportError:  # pragma: no cover
    service_identity = NotImplemented

import treq
try:
    from treq.response import _Response as TQResponse
except ImportError:  # pragma: no cover
    TQResponse = NotImplemented

from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol, connectionDone
from twisted.python import log
from twisted.python.failure import Failure
from twisted.web.client import (
    Response as TWResponse, GzipDecoder as TWGzipDecoder, ResponseDone)
from twisted.web._newclient import (
    ResponseNeverReceived, RequestTransmissionFailed)

from pyfarm.core.enums import STRING_TYPES, NOTSET, INTEGER_TYPES
from pyfarm.core.utility import ImmutableDict
from pyfarm.agent.config import config
from pyfarm.agent.logger import getLogger
from pyfarm.agent.utility import quote_url, json_safe

logger = getLogger("agent.http.client")

# response classes which are allowed to the `response` argument
# to Response.__init__
if TQResponse is not NotImplemented:
    RESPONSE_CLASSES = (TWResponse, TWGzipDecoder, TQResponse)
else:  # pragma: no cover
    RESPONSE_CLASSES = (TWResponse, TWGzipDecoder)

USERAGENT = "PyFarm/1.0 (agent)"
DELAY_NUMBER_TYPES = tuple(list(INTEGER_TYPES) + [float])
HTTP_METHODS = frozenset(("HEAD", "GET", "POST", "PUT", "PATCH", "DELETE"))
HTTP_SCHEMES = frozenset(("http", "https"))


[docs]class HTTPLog(object): """ Provides a wrapper around the http logger so requests and responses can be logged in a standardized fashion. """
[docs] @staticmethod def queue(method, url, uid=None): """Logs the request we're asking treq to queue""" assert isinstance(uid, UUID) logger.debug("Queue %s %s (uid: %s)", method, url, uid.hex[20:])
[docs] @staticmethod def response(response, uid=None): """Logs the return code of a request that treq completed""" assert isinstance(response, TQResponse) assert isinstance(uid, UUID) message = "%s %s %s %s (uid: %s)" args = ( response.code, responses.get(response.code, "UNKNOWN"), response.request.method, response.request.absoluteURI, uid.hex[20:] ) if (response.code >= INTERNAL_SERVER_ERROR or response.code >= BAD_REQUEST): logger.error(message, *args) else: logger.info(message, *args) # Return so response can be handled by another callback return response
[docs] @staticmethod def error(failure, uid=None, method=None, url=None): """ Called when the treq request experiences an error and calls the ``errback`` method. """ assert isinstance(uid, UUID) assert isinstance(failure, Failure) message = ("%s %s has failed (uid: %s):%s%s" % (method, url, uid.hex[20:], os.linesep, failure.getTraceback())) # RequestTransmissionFailed and ResponseNeverReceived can happen as # part of normal operations, and do not necessarily count as errors if failure.type in (ResponseNeverReceived, RequestTransmissionFailed): logger.debug(message) else: logger.error(message) # Reraise the error so other code can handle the error raise failure
[docs]def build_url(url, params=None): """ Builds the full url when provided the base ``url`` and some url parameters: >>> build_url("/foobar", {"first": "foo", "second": "bar"}) '/foobar?first=foo&second=bar' >>> build_url("/foobar bar/") ''/foobar%20bar/' :param str url: The url to build off of. :param dict params: A dictionary of parameters that should be added on to ``url``. If this value is not provided ``url`` will be returned by itself. Arguments to a url are unordered by default however they will be sorted alphabetically so the results are repeatable from call to call. """ assert isinstance(url, STRING_TYPES) # append url arguments if isinstance(params, (dict, ImmutableDict, UserDict)) and params: url += "?" + "&".join([ "%s=%s" % (key, value)for key, value in sorted(params.items())]) return quote_url(url)
[docs]def http_retry_delay(offset=None, factor=None, rand=None): """ Returns a floating point value that can be used to delay an http request. The main purpose of this is to ensure that not all requests are run with the same interval between them. The basic formula for the retry delay is: .. code:: python offset * (random() * factor) :type factor: int or float :param factor: The factor to multiply the output from :func:`random` by. This defaults to the ``agent_http_retry_delay_factor`` configuration variable. :param offset: The initial offset to start the calculation at. This defaults to the ``agent_http_retry_delay_offset`` configuration variable. :param rand: A callable to determine randomness, defaulting to :func:`random`. This is mainly used for testing purposes. """ if factor is None: factor = config["agent_http_retry_delay_factor"] if offset is None: offset = config["agent_http_retry_delay_offset"] if rand is None: rand = random assert isinstance(factor, DELAY_NUMBER_TYPES) assert isinstance(offset, DELAY_NUMBER_TYPES) return offset + (rand() * factor)
[docs]class Request(namedtuple("Request", ("method", "url", "kwargs"))): """ Contains all the information used to perform a request such as the ``method``, ``url``, and original keyword arguments (``kwargs``). These values contain the basic information necessary in order to :meth:`retry` a request. """
[docs] def retry(self, **kwargs): """ When called this will rerun the original request with all of the original arguments to :func:`request` :param kwargs: Additional keyword arguments which should override the original keyword argument(s). """ # first take the original keyword arguments # and provide overrides request_kwargs = self.kwargs.copy() request_kwargs.update(kwargs) # log and retry the request debug_kwargs = request_kwargs.copy() url = build_url(self.url, debug_kwargs.pop("params", None)) logger.debug( "Retrying %s %s, kwargs: %r", self.method, url, debug_kwargs) return request(self.method, self.url, **request_kwargs)
[docs]class Response(Protocol): """ This class receives the incoming response body from a request constructs some convenience methods and attributes around the data. :param Deferred deferred: The deferred object which contains the target callback and errback. :param response: The initial response object which will be passed along to the target deferred. :param Request request: Named tuple object containing the method name, url, headers, and data. """ def __init__(self, deferred, response, request): assert isinstance(deferred, Deferred) assert isinstance(response, RESPONSE_CLASSES) assert isinstance(request, Request) # internal attributes self._done = False self._body = "" self._deferred = deferred # main public attributes self.request = request self.response = response # convenience attributes constructed # from the public attributes self.method = self.request.method self.url = self.request.url self.code = self.response.code self.content_type = None # consume the response headers self.headers = {} for header_key, header_value in response.headers.getAllRawHeaders(): if len(header_value) == 1: header_value = header_value[0] self.headers[header_key] = header_value # determine the content type if "Content-Type" in self.headers: self.content_type = self.headers["Content-Type"]
[docs] def data(self): """ Returns the data currently contained in the buffer. :raises RuntimeError: Raised if this method id called before all data has been received. """ if not self._done: raise RuntimeError("Response not yet received.") return self._body
[docs] def json(self, loader=json.loads): """ Returns the json data from the incoming request :raises RuntimeError: Raised if this method id called before all data has been received. :raises ValueError: Raised if the content type for this request is not application/json. """ if not self._done: raise RuntimeError("Response not yet received.") elif self.content_type != "application/json": raise ValueError("Not an application/json response.") else: return loader(self._body)
[docs] def dataReceived(self, data): """ Overrides :meth:`.Protocol.dataReceived` and appends data to ``_body``. """ self._body += data
[docs] def connectionLost(self, reason=connectionDone): """ Overrides :meth:`.Protocol.connectionLost` and sets the ``_done`` when complete. When called with :class:`.ResponseDone` for ``reason`` this method will call the callback on ``_deferred`` """ if reason.type is ResponseDone: self._done = True url = build_url(self.request.url, self.request.kwargs.get("params")) code_text = responses.get(self.code, "UNKNOWN") logger.debug( "%s %s %s %s, body: %s", self.code, code_text, self.request.method, url, self._body) self._deferred.callback(self) else: self._deferred.errback(reason)
[docs]def request(method, url, **kwargs): """ Wrapper around :func:`treq.request` with some added arguments and validation. :param str method: The HTTP method to use when making the request. :param str url: The url this request will be made to. :type data: str, list, tuple, set, dict :keyword data: The data to send along with some types of requests such as ``POST`` or ``PUT`` :keyword dict headers: The headers to send along with the request to ``url``. Currently only single values per header are supported. :keyword function callback: The function to deliver an instance of :class:`Response` once we receive and unpack a response. :keyword function errback: The function to deliver an error message to. By default this will use :func:`.log.err`. :keyword class response_class: The class to use to unpack the internal response. This is mainly used by the unittests but could be used elsewhere to add some custom behavior to the unpack process for the incoming response. :raises NotImplementedError: Raised whenever a request is made of this function that we can't implement such as an invalid http scheme, request method or a problem constructing data to an api. """ assert isinstance(url, STRING_TYPES) direct = kwargs.pop("direct", False) # We only support http[s] parsed_url = urlparse(url) if not parsed_url.hostname: raise NotImplementedError("No hostname present in url") if not parsed_url.path: raise NotImplementedError("No path provided in url") if not direct: original_request = Request( method=method, url=url, kwargs=ImmutableDict(kwargs.copy())) # Headers headers = kwargs.pop("headers", {}) headers.setdefault("Content-Type", ["application/json"]) headers.setdefault("User-Agent", [USERAGENT]) # Twisted requires lists for header values for header, value in headers.items(): if isinstance(value, STRING_TYPES): headers[header] = [value] elif isinstance(value, (list, tuple, set)): continue else: raise NotImplementedError( "Cannot handle header values with type %s" % type(value)) # Handle request data data = kwargs.pop("data", NOTSET) if isinstance(data, dict): data = json_safe(data) if (data is not NOTSET and headers["Content-Type"] == ["application/json"]): data = json.dumps(data) elif data is not NOTSET: raise NotImplementedError( "Don't know how to dump data for %s" % headers["Content-Type"]) # prepare keyword arguments kwargs.update( headers=headers, persistent=config["agent_http_persistent_connections"]) if data is not NOTSET: kwargs.update(data=data) if direct: # We don't support these with direct request # types. assert "callback" not in kwargs assert "errback" not in kwargs assert "response_class" not in kwargs # Construct the request and attach some loggers # to callback/errback. uid = uuid4() treq_request = treq.request(method, url, **kwargs) treq_request.addCallback(HTTPLog.response, uid=uid) treq_request.addErrback(HTTPLog.error, uid=uid, method=method, url=url) return treq_request else: callback = kwargs.pop("callback", None) errback = kwargs.pop("errback", log.err) response_class = kwargs.pop("response_class", Response) # check assumptions for keywords assert callback is not None, "callback not provided" assert callable(callback) and callable(errback) assert data is NOTSET or \ isinstance(data, tuple(list(STRING_TYPES) + [dict, list])) def unpack_response(response): deferred = Deferred() deferred.addCallback(callback) # Deliver the body onto an instance of the response # object along with the original request. Finally # the request and response via an instance of `Response` # to the outer scope's callback function. response.deliverBody( response_class(deferred, response, original_request)) return deferred debug_kwargs = kwargs.copy() debug_url = build_url(url, debug_kwargs.pop("params", None)) logger.debug( "Queued %s %s, kwargs: %r", method, debug_url, debug_kwargs) try: deferred = treq.request(method, quote_url(url), **kwargs) except NotImplementedError: # pragma: no cover logger.error( "Attempting to access a url over SSL but you don't have the " "proper libraries installed. Please install the PyOpenSSL and " "service_identity Python packages.") raise deferred.addCallback(unpack_response) deferred.addErrback(errback) return deferred
# Old style requests. These are in place so we can # improve on the agent without having to rewrite # everything at once. # TODO: remove these once everything is converted to *_direct head = partial(request, "HEAD") get = partial(request, "GET") post = partial(request, "POST") put = partial(request, "PUT") patch = partial(request, "PATCH") delete = partial(request, "DELETE") # New style requests which we can utilize on a case # by case basis. # TODO: rename these to the above functions once everything is using them head_direct = partial(request, "HEAD", direct=True) get_direct = partial(request, "GET", direct=True) post_direct = partial(request, "POST", direct=True) put_direct = partial(request, "PUT", direct=True) patch_direct = partial(request, "PATCH", direct=True) delete_direct = partial(request, "DELETE", direct=True)