Make Sure Your Applications Crash

Creative Commons License
This work is licensed under a Creative Commons Attribution 3.0 Unported License.

Alternative formats:

Abstract

Applications can crash, hang or get stuck in infinite loops — even if they are written in Python. While it is good to make efforts to reduce their chances of crashing, it is also good to make sure that detection and recovery scenarios are in place for those times when something goes wrong.

Introduction

It is more and more popular to write long-running server processes in Python. Especially on unattended appliances, it is important to detect crashes and hangs, and recover from them. In the words of A. L. Scherr, “a system failure can usually be considered to be the result of two program errors: the first, in the program that started the problem; the second, in the recovery routine that could not protect the system.” (“Functional Structure of IBM Virtual Storage Operating Systems, Part II: OS/VS-2 Concepts and Philosophies,” IBM Systems Journal, Vol. 12, No. 4.)

Failure modes are similar, no matter the application language. Segmentation faults are still possible — via bugs in the core Python interpreter or native code modules. Bugs in Python applications themselves can cause crashing because of untrapped exception (some exceptions, like MemoryError, are better off not trapped). They can also hang, when calling a system call which blocks forever. At the algorithm level, bugs like infinite loops or thread-deadlock can cause a process to be unresponsive.

A higher level mode, sometimes caused as a result of trapping an unexpected exception, is a process that is an inconsistent state, and can no longer serve its purpose. Those process appear as alive to most crash-detection alorithms — for example, they will appear to have their mainloop running, and any heart-beat task will still run. This means that there must be application-level crash detectors for proper error recovery.

Data Consistency

After a process crash, and after a presumed process start-up, the data might be in an incosistent state. A process can crash during any update of permanent storage. The easiest way to ensure data consistency is to have none. Any process that can be written to rely on data from other sources rather than maintaining its own should be so written. For performance reasons, it is sometimes useful to maintain a cache. As long as cache data can be reliably rebuilt, any recovery process can rebuild it, and not have to worry about its consistency. If permanent storage is necessary, it is best if it is kept consistent by using atomic operations. The most popular way to do so is by using a transactional database, although care must be given to considering the case of the database’s crash recovery procedures. On a UNIX system, taking advantage of the fact that file move is atomic can be useful:

def update_counter():
    fp = file("counter.txt")
    s = fp.read()
    counter = int(s.strip())
    counter += 1
    # If there is a crash before this point,
    # no changes have been done.
    fp = file("counter.txt.tmp", 'w')
    print >>fp, counter
    fp.close()
    # If there is a crash before this point,
    # only a temp file has been modified
    # The following is an atomic operation
    os.rename("counter.txt.tmp", "counter.txt")

However, the last line would not work correctly on a Windows system — rename cannot take over an existing files. In cases such as these, it is possible to make the data inconsistent as long as any inconsistency can be detected at recovery time:

def update_counter():
    fp = file("counter.txt")
    s = fp.read()
    counter = int(s.strip())
    counter += 1
    # If there is a crash before this point,
    # no changes have been done.
    fp = file("counter.txt.tmp", 'w')
    print >>fp, counter
    fp.close()
    # If there is a crash before this point,
    # only a temp file has been modified
    os.remove("counter.txt")
    # At this point, the state is inconsistent*
    # The following is an atomic operation
    os.rename("counter.txt.tmp", "counter.txt")

def recover():
    if not os.path.exists("counter.txt"):
        # The permanent file has been removed
        # Therefore, the temp file is valid
        os.rename("counter.txt.tmp", "counter.txt")

In production code, we would also remove the spurious temp file if it was written but the counter file has not yet been removed — but this is not strictly needed. The only necessary thing is to fix the consistency problem if a crash happened at the point marked with an asterix.

A different technique is to use versioning, and have a janitorial process take care of obsolete versions:

def update_counter():
    files = [int(name.split('.')[-1])
                  for name in os.listdir('.')
                      if name.startswith('counter.')]
    last = max(files)
    counter = int(file('counter.%s' % last).read().strip())
    counter += 1
    # If there is a crash before this point,
    # no changes have been done.
    fp = file("tmp.counter", 'w')
    print >>fp, counter
    fp.close()
    # If there is a crash before this point,
    # only a temp file has been modified
    os.rename('tmp.counter', 'counter.%s' % (last+1))
    os.remove('counter.%s' % last)

# This is not a recovery routine, but a cleanup routine
# Even in its absence, the state is consistent
def cleanup():
    files = [int(name.split('.')[-1])
                  for name in os.listdir('.')
                      if name.startswith('counter.')]
    files.sort()
    files.pop()
    for n in files:
        os.remove('counter.%d' % n)
    if os.path.exists('tmp.counter'):
        os.remove('tmp.counter')

A variant on this method allows us to keep key/value stores in rotating log files:

0.log:
  ['add', 'key-0', 'value-0']
  ['add', 'key-1', 'value-1']
  ['add', 'key-0', 'value-2']
  ['remove', 'key-1']
  .
  .
  .

1.log:
  .
  .
  .

2.log:
  .
  .
  .

The code for dealing with those files is simple, and needs little special recovery:

## First, some utility functions:

## Get the level of a file
def getLevel(s)
    return int(s.split('.')[0])

## Get all files of a given type
def getType(tp):
    return [(getLevel(s), s)
                 for s in files if s.endswith(tp)]

## Get all relevant files
def relevant(d):
    files = os.listdir(d):
    mlevel, master = max(getType('.master'))
    logs = getType('.log')
    logs.sort()
    return master+[log for llevel, log in logs if llevel>mlevel]

## Read in a single file
def update(result, fp):
    for line in fp:
        value = json.loads(line)
        if value[0] == 'add':
            result[value[1]] = value[2]
        else:
            del result[value[1]]

## Read in several files
def read(files):
    result = dict()
    for fname in files:
        try:
            update(result, file(fname))
        except ValueError:
            pass
    return result

## The actual data store abstraction.
class Store(object):
    def __init__(self):
        files = relevant(d)
        self.result = read(files)
        self.fp = None
        self.level = getLevel(files[-1])
        self._next()
    def _next(self):
        self.level += 1
        if self.fp:
            self.fp.close()
        self.fp = file('%3d.log' % self.currentLevel, 'w')
        self.rows = 0
    def get(self, key):
        return self.result[key]
    def add(self, key, value):
        self._write(['add', key, value])
    def _write(self, value):
        print >>fp, json.dumps(value)
        fp.flush()
        self.rows += 1
        if self.rows>200:
            self._next()
    def remove(self, key):
        print >>fp, json.dumps(['remove', key])
        fp.flush()

## This should be run periodically from a different thread
def compress(d):
    files = relevant(d)[:-1]
    if len(files)<2:
        return
    result = read(files)
    master = getLevel(files[-1])+1
    fp = file('%3d.master.tmp' % master, 'w')
    for key, value in result.iteritems():
        print >>fp, json.dumps(['add', key, value])
    fp.close()

If we need to keep an efficient cache of this, we can initialize the cache only in cases where it is inconsistent. For example, we can wrap the store in a function that notes cache inconsistency, then updates the cache and then removes the inconsistency note. If the inconsistency note is there, we can reinitialize the cache.

If we are using a database with no transaction support, such as Redis or the log/master store above, and we need to modify multiple things, we can take advantage of using correct orderings, and again add code in the recovery case:

def activate_due():
    scheduled = rs.smembers('scheduled')
    now = time.time()
    for el in scheduled:
        due = int(rs.get(el+':due'))
        if now<due:
            continue
        rs.sadd('activated', el)
        rs.delete(el+':due')
        rs.sremove('scheduled', el)

def recover():
    inconsistent = rs.sinter('activated', 'scheduled')
    for el in inconsistent:
        rs.delete(el+':due') #*
        rs.sremove('scheduled', el)

Note that keeping the order of operations is important even in the recovery code — if the line marked with an asterix was to follow, rather than precede, the next line, crashing at that line would result in an element which is not scheduled but which does have a “due” attribute.

After using some subset of the techniques above to ensure data consistency, it is best to avoid scheduling tasks for “process shutdown”. This way, all process shutdowns, planned or not, will behave the same way and will therefore be easier to test. Indeed, such an attitude will lead to more bad-shutdown bugs being discovered during testing and therefore more bad-shutdown bugs being fixed. Together with a strong attempt to use mocking to make sure that every line inside the critical routines that handle data modification can be crash point, correct behavior in the face of arbitrary shutdowns can be ascertained.

Availability

After the data consistency problem is dealt with, the only result of a process crash is temporary unavailability of functionality. At its worst, the user could restart the system entirely. The two problems with such a scenario are that user intervention should not be assumed, and such a method of recovery would take too long — both the time the user takes to notice the problem, and the reboot time. Other than attempting to avoid crashes, increasing availailability means decreasing the effect crashes have on the behavior of the system. This is done by limiting the impact of a crash itself, detecting a crash as soon as possible and recovering from a crash after it has been detected as quickly as possible.

Reducing crash impact can be done in two, not mutually exclusive way — so-called vertical or horizontal process splitting. Splitting processes vertically means running the same code in multiple processes, and allocating responsibility for tasks using some sort of allocator. The most famous examples of such splitting is in network server processes, where each process handles some subset (sometimes only one) network connection. In those servers, problems in one process only effect the network connections handled by this process, and other processes continue the same. However, any task where communications between different parallel execution streams can be divided in this way:

def forking_server():
    s = socket.socket()
    s.bind(('', 8080))
    s.listen(5)
    while True:
        client = s.accept()
        newpid = os.fork()
        if newpid:
            f = client.makefile()
            f.write("Sunday, May 22, 1983 18:45:59-PST")
            f.close()
            os._exit()

Splitting processes horizontally means running different code bases in different processes — letting each process deal with one sort of task, and communicate its results, possibly, to other processes. Even in the example above, we saw such a division — the load balancer process only did load balancing, while the network server processes executed the application-specific code. We can stretch splitting further — in a web server, we often divide the processes into load-balancing, front-end web application, back-end web services and database services. A crash in the back-end will cause the front-end to retry connecting, and so the application will look slow, but not unavailable, to the end user.

In many of the cases of splitting processes, we need some form of communication between the processes. If communication can be avoided, this will minimize the effects one process has on another. It is often the case that direct communication can be avoided in favor of a shared store. While introducing subtle data update order, if we have a shared store (that properly recovers its data after crashes) we can use it to implicitly communicate between two processes by one process leaving data for another process to consume. An example can be a back-end service that calculates information offline, and feeds the information into a database that the front end queries. Crashes in either the front-end or the back-end then do not impact the other process:

## Process one
class SchedulerResource(resource.Resource):
    isLeaf = True
    def __init__(self, filepath):
        resource.Resource.__init__(self)
        self.filepath = filepath
    def render_PUT(self, request):
        uuid, = request.postpath
        content = request.content.read()
        self.filepath.child(uuid).setContent(content)
s = server.Site(SchedulerResource(filepath.FilePath("things")))
reactor.listenTCP(8080, s)

## Process two
rs = redis.Redis(host='localhost', port=6379, db=9)
while True:
    for fname in os.listdir("things"):
        when = int(file(fname).read().strip())
        rs.set(uuid+':due', when)
        rs.sadd('scheduled', uuid)
        os.remove(fname)
    time.sleep(1)

## Process three
rs = redis.Redis(host='localhost', port=6379, db=9)
recover()
while True:
    activate_due()
    time.sleep(1)

If implicit communication using shared storage is inadvisable, a message queue can often be helpful. Message queues have the advantage of being a fairly reliable infrastructure, and allowing for application process connecting and disconnecting:

## Process four
rs = redis.Redis(host='localhost', port=6379, db=9)
channel = pika.BlockingConnection(pika.ConnectionParameters(
              'localhost')).channel()
channel.queue_declare(queue='active')
while True:
    activated = rs.smembers('activated')
    finished = set(rs.smembers('finished'))
    for el in activated:
        if el in finished:
            continue
        channel.basic_publish(exchange='',
                              routing_key='active',
                              body=el)
        rs.add('finished', el)

## Process five
# It is possible to get "dups" of bodies.
# Application logic should deal with that
channel = pika.BlockingConnection(pika.ConnectionParameters(
              'localhost')).channel()
channel.queue_declare(queue='active')
def callback(ch, method, properties, el):
    syslog.syslog('Activated %s' % el)
channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()

If point-to-point connections are desirable, web-based communications have the benefit of being easily restartable, especially when using RESTful APIs.

Especially in the case of horizontal splitting, it is crucial to make sure that any given service is available. Unavailability means either a front-end is not functioning, which causes user-visible failing, or some queue not being processed properly. A basic way to do so is to monitor the process itself being alive. On UNIX systems, this can be as simple as sending a “kill 0” to the process periodically, and checking that there are no errors. However, such process liveness checks will not detect a process that is stuck in an infinite loop.

A better way is to send and detect heartbeats. The simplest heart-beats are simply files which are touched in regular intervals:

## In a Twisted process
def beat():
    file('beats/my-name', 'a').close()
task.LoopingCall(beat).start(30)

## Watchdog
while True:
    now = time.time()
    timeout = dict()
    for heart in glob.glob('hearts/*'):
        beat = int(file(heart).read().strip())
        timeout[heart] = now-beat
    for heart in glob.glob('beats/*'):
        if not os.path.isfile(heart):
            mtime = 0
        else:
            mtime = os.path.getmtime(heart)
        problem = 'problems/'+heart
        if (mtime<timeout[heart] and
           not os.path.isfile(problem)):
            fp = file('problems/'+heart, 'w')
            fp.write('watchdog')
            fp.close()
    problemTimeout = now-30
    for problem in glob.glob('problems/*'):
        if os.path.getmtime(problem)<problemTimeout:
            subprocess.call(['restart-system'])
    time.sleep(1)

In this watchdog example, we assume that the actual process restart is done by some dedicated process manager. Our watchdog process is only responsible for detecting the problems, as well as restarting the whole system if the manager itself seems stuck. Note that a proper implementation of restart-system is non-trivial — it needs to track down and kill each part of the old system.

Note that the watchdog never tries to be the sole source of truth regarding problems — it merely checks that every problem was appropriately dealt with. This means that there is an opportunity for a system to include high-level checkers — things that check liveness by, for example, sending test requests and checking that these are dealt with in a reasonable manner. If the the test request hangs or fails, the checker can manufacture a problem.

On a real production system, the watchdog process would not be its own main loop — it would be called from something like cron, so that a single loop failing would not cause the watchdog itself to fail. For extra reliability, the watchdog command can end with touching a file. On Linux systems, not touching this file often enough can be set to cause a reboot using the Linux “watchdog” daemon.

If problems are properly detected, and hanging processes are restarted, it is good to have processes come up as fast as possible. If it is at all possible, processes should not depend on each other during start-up — for example, consistency checks should usually precede connecting to the message queue, as this allows the message queue to come up while the consistency checks are running.

Conclusions

Everything can crash, and usually will. While fixing bugs to avoid crashes is a good idea, planning for the inevitable is even more important and can keep a system running with minimal user-visible interruption.

<a rel=”license” href=”http://creativecommons.org/licenses/by/3.0/”><img alt=”Creative Commons License” style=”border-width:0″ src=”http://i.creativecommons.org/l/by/3.0/80×15.png&#8221; /></a><br />This work is licensed under a <a rel=”license” href=”http://creativecommons.org/licenses/by/3.0/”>Creative Commons Attribution 3.0 Unported License</a>.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: