Source code for pyfarm.jobtypes.core.process

# No shebang line, this module is meant to be imported
#
# Copyright 2013 Oliver Palmer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Process
--------

Module responsible for connecting a Twisted process object and
a job type.  Additionally this module contains other classes which
are useful in starting or managing a process.
"""

import os
from uuid import uuid4

from psutil import Process, NoSuchProcess
from twisted.internet import reactor
from twisted.internet.protocol import ProcessProtocol as _ProcessProtocol

from pyfarm.agent.logger import getLogger
from pyfarm.jobtypes.core.log import STDOUT, STDERR

logger = getLogger("jobtypes.process")


[docs]class ReplaceEnvironment(object): """ A context manager which will replace ``os.environ``'s, or dictionary of your choosing, for a short period of time. After exiting the context manager the original environment will be restored. This is useful if you have something like a process that's using global environment and you want to ensure that global environment is always consistent. :param dict environment: If provided, use this as the environment dictionary instead of ``os.environ`` """ def __init__(self, frozen_environment, environment=None): if environment is None: environment = os.environ self.environment = environment self.original_environment = None self.frozen_environment = frozen_environment def __enter__(self): self.original_environment = self.environment.copy() self.environment.clear() self.environment.update(self.frozen_environment) return self def __exit__(self, *_): self.environment.clear() self.environment.update(self.original_environment)
[docs]class ProcessProtocol(_ProcessProtocol): """ Subclass of :class:`.Protocol` which hooks into the various systems necessary to run and manage a process. More specifically, this helps to act as plumbing between the process being run and the job type. """ def __init__(self, jobtype): self.jobtype = jobtype # Internal only attributes, __uuid itself is a property so # it can't be accidentally modified later. self.__ended = False self.__uuid = uuid4() def __repr__(self): return \ "ProcessProtocol(uuid=%s, pid=%r)" % (self.uuid, self.pid) @property def uuid(self): return self.__uuid @property def pid(self): # Transport may not have been setup if the process failed to start. try: return self.transport.pid except AttributeError: return None @property def process(self): """The underlying Twisted process object""" return self.transport @property def psutil_process(self): """Returns a :class:`psutil.Process` object for the running process""" # It's possible that the process could have # ended but not died yet. So we have this # check just in case this property is called # during this state. if self.__ended: return None try: return Process(pid=self.pid) except NoSuchProcess: # pragma: no cover return None
[docs] def connectionMade(self): """ Called when the process first starts and the file descriptors have opened. """ self.jobtype._process_started(self)
[docs] def processEnded(self, reason): """ Called when the process has terminated and all file descriptors have been closed. :meth:`processExited` is called, too, however we only want to notify the parent job type once the process has freed up the last bit of resources. """ try: self.__ended = True self.jobtype._process_stopped(self, reason) except Exception as e: logger.error("Exception caught while running " "jobtype._process_stopped: %s", e)
[docs] def outReceived(self, data): """Called when the process emits on stdout""" try: self.jobtype._process_output(self, data, STDOUT) except Exception as e: logger.error( "Exception caught while handling STDOUT in " "jobtype._process_output: %s", e)
[docs] def errReceived(self, data): """Called when the process emits on stderr""" try: self.jobtype._process_output(self, data, STDERR) except Exception as e: logger.error( "Exception caught while handling STDERR in " "jobtype._process_output: %s", e)
[docs] def kill(self): """Kills the underlying process, if running.""" logger.info("Killing %s", self) if not self.pid: logger.warning("Process has no pid, it has probably already " "terminated. Not killing it.") return children = None try: process = self.psutil_process if not process: return children = process.children(recursive=True) process.kill() except Exception as e: # pragma: no cover logger.warning("Cannot kill %s: %s.", self, e) def kill_children(children): for child in children: try: child.kill() except NoSuchProcess: # We don't care about that pass if children: reactor.callLater(2, kill_children, children)
[docs] def terminate(self): """Terminates the underlying process, if running.""" logger.info("Terminating %s", self) if not self.pid: logger.warning("Process has no pid, it has probably already " "terminated. Not terminating it.") return children = None try: process = self.psutil_process if not process: return children = process.children(recursive=True) process.terminate() except Exception as e: # pragma: no cover logger.warning("Cannot terminate %s: %s.", self, e) def terminate_children(children): for child in children: try: child.terminate() except NoSuchProcess: pass if children: reactor.callLater(2, terminate_children, children)
[docs] def interrupt(self): """Interrupts the underlying process, if running.""" logger.info("Interrupt %s", self) try: self.process.signalProcess("INT") except Exception as e: # pragma: no cover logger.warning("Cannot interrupt %s: %s.", self, e)
[docs] def running(self): """Method to determine whether the child process is currently running""" return self.pid is not None