Wed 20 August 2014

Dekoratoren in Python, erklärt in Farbe

Präambel

Dieser Artikel ist schon ein paar Jahre alt und war eigentlich nur für den Gebrauch bei meinem damaligen Arbeitgeber gedacht. Damit sich aber endlich mal etwas in diesem Blog tut, habe ich mich entschlossen, ihn dennoch zu veröffentlichen.

Note

Die Code-Beispiele sind in Python 2-Syntax gehalten. Das Übersetzen nach Python 3 wird dem Leser als Übung überlassen.

Was sind Dekoratoren?

Wie der Namen bereits vermuten lässt, dekorieren Dekoratoren etwas. Bei Python sind es Funktionen bzw. Methoden und seit Version 2.6/3.0 auch Klassen. Ein Beispiel für Dekoratoren:

@decorator
def spam():
    print "Ich bin spam."

Note

Ein Hinweis für Java-Entwickler: Die Syntax sieht zwar ähnlich aus wie Annotations in Java, hat aber ansonsten nichts mit Annotations in Java zu tun.

Was passiert jetzt, wenn man den obigen Code ausführt?

>>> @decorator
... def spam():
...     print "Ich bin spam."
...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'decorator' is not defined

Der NameError ist jetzt natürlich wenig überraschend, schließlich wurde der Dekorator decorator noch nicht definiert. Im Folgenden werden wir das nachholen:

def decorator(func):
    print "Ich bin ein Dekorator und dekoriere", func.__name__
    return func

Was passiert jetzt, wenn man den obigen Code ausführt?

>>> def decorator(func):
...     print "Ich bin ein Dekorator und dekoriere", func.__name__
...     return func
...
>>> @decorator
... def spam():
...     print "Ich bin spam."
...
Ich bin ein Dekorator und dekoriere spam
>>> spam
<function spam at 0x7f8d95b3c488>

Und was, wenn man die dekorierte Funktion ausführt?

>>> spam()
Ich bin spam.

Wie einfach zu erkennen ist, hat unser simpler Beispiel-Dekorator keine direkte Auswirkung auf die dekorierte Funktion.

Ich weiß jetzt noch immer nicht, was ein Dekorator ist

Ein Beispiel alleine erklärt natürlich nicht, was ein Dekorator ist. Was ist also ein Dekorator? Ein Dekorator ist nichts anderes als ein Callable, das implizit als Argument die zu dekorierende Funktion bzw. Methode bzw. Klasse übergeben bekommt. Der Rückgabewert des Dekorators wird dann an den Namen der Funktion bzw. Methode bzw. Klasse gebunden. Anders ausgedrückt:

@decorator
def spam():
    pass

ist nichts anderes als syntaktischer Zucker für

def spam():
    pass
spam = decorator(spam)

Das ist auch der Grund, warum unser Beispieldekorator decorator die Funktion explizit wieder zurück gibt.

Das, was nach dem @ folgt, muss jedoch nicht zwingend ein Name sein. Vielmehr kann es ein beliebiger Ausdruck sein. Der Ausdruck wird evaluiert und das Ergebnis des Ausdrucks ist dann der Dekorator. Um das zu verdeutlichen, folgt jetzt ein Beispiel für eine Dekoratoren-Factory:

def decorator_for(name):
    """Ich bin eine Factory und ich gebe einen Dekorator zurück."""
    print "Erzeuge einen Dekorator für", name
    # Den eigentlichen Dekorator (aus dem vorigen Beispiel) zurück geben
    return decorator

@decorator_for("Csaba")
def spam():
    print "Ich bin spam."

Und wieder die Frage, was passiert, wenn man den Code ausführt?

>>> def decorator_for(name):
...     """Ich bin eine Factory und ich gebe einen Dekorator zurück."""
...     print "Erzeuge einen Dekorator für", name
...     # Den eigentlichen Dekorator (aus dem vorigen Beispiel) zurück geben
...     return decorator
...
>>> @decorator_for("Csaba")
... def spam():
...     print "Ich bin spam."
...
Erzeuge einen Dekorator für Csaba
Ich bin ein Dekorator und dekoriere spam

Und was passiert, wenn man die dekorierte Funktion ausführt?

>>> spam()
Ich bin spam.

Wie zu erwarten, noch immer nichts spektakuläres.

Wann wird ein Dekorator ausgeführt?

Der aufmerksame Leser kann sich diese Frage bereits selbst beantworten: Der Dekorator wird nicht etwa bei jedem Aufruf der dekorierten Funktion bzw. Methode aufgerufen, sondern genau ein einziges Mal, nämlich dann, wenn die Funktion bzw. Methode deklariert wird.

Ich will aber, dass bei jedem Funktionsaufruf etwas passiert

Auch das kann sich der aufmerksame Leser bereits selber herleiten: Wie bereits erwähnt, wird der Rückgabewert des Dekorators an den Namen der Funktion gebunden, die dekoriert wird. Als Beispiel also jetzt ein Dekorator, der nicht explizit die dekorierte Funktion wieder zurückgibt.

def useless_decorator(func):
    return None
>>> @useless_decorator
... def spam():
...     print "Ich bin spam."
...
>>> spam()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'NoneType' object is not callable
>>> spam
>>>

Da der Dekorator None zurückgibt, wird auch None an den Namen der Funktion, also spam, gebunden und die ursprüngliche Funktion ging verloren.

Man kann jetzt natürlich nicht nur die Original-Funktion oder None zurück geben. Vielmehr kann man auch einen Wrapper zurück geben, der dann die ursprüngliche Funktion aufruft. Damit hat man dann das erreicht, was man wollte: Bei Jedem Funktionsaufruf soll etwas passieren.

def verbose_caller(func):
    print "Erzeuge einen Wrapper für die Funktion", func.__name__

    def wrapper():
        print "Rufe die Funktion", func.__name__, "auf"
        func()
    return wrapper
>>> @verbose_caller
... def spam():
...     print "Ich bin spam"
...
Erzeuge einen Wrapper für die Funktion spam
>>> spam()
Rufe die Funktion spam auf
Ich bin spam
>>> spam
<function wrapper at 0x7f8d95b3c758>

So gesehen ist der Dekorator in diesem Fall nichts anderes als eine Wrapper-Factory.

Da das immer noch ziemlich langweilig ist, wollen wir uns jetzt einen personalisierten Wrapper erzeugen. Dazu bauen wir eine Wrapper-Factory-Factory:

def verbose_caller_for(name):
   print "Erzeuge eine Wrapper-Factory für", name

   def verbose_caller(func):
       print "Erzeuge einen Wrapper für die Funktion", func.__name__

       def wrapper():
           print "Rufe die Funktion", func.__name__, "für", name, "auf"
           func()
       return wrapper
   return verbose_caller
>>> @verbose_caller_for("Csaba")
... def spam():
...     print "Ich bin spam"
...
Erzeuge eine Wrapper-Factory für Csaba
Erzeuge einen Wrapper für die Funktion spam
>>> spam()
Rufe die Funktion spam für Csaba auf
Ich bin spam

Da das ganze langsam doch recht unübersichtlich wird, überlegen wir uns als gute Entwickler, wie man das ganze verschönern könnte.

Klassenbasierte Dekoratoren

Dekoratoren sind einfach Callables, die das zu dekorierende Objekt entgegen nehmen. Instanzen von Klassen können jedoch ein Callable implementieren, über die spezielle Methode __call__. Warum also nicht einen Dekorator über eine Klasse implementieren? Als passionierte Java-Entwickler wissen wir, dass Klassen alles übersichtlicher machen.

class PersonalizedVerboseCaller(object):
    def __init__(self, name):
        self.name = name

    def __call__(self, func):
        return self.decorate(func)

    def decorate(self, func):
        """Wrapper factory."""
        print "Erzeuge einen Wrapper für die Funktion", func.__name__
        def wrapper():
            print "Rufe die Funktion", func.__name__, "für", self.name, "auf"
            func()
        return wrapper
>>> @PersonalizedVerboseCaller("Csaba")
... def spam():
...     print "Ich bin spam"
...
Erzeuge einen Wrapper für die Funktion spam
>>> spam()
Rufe die Funktion spam für Csaba auf
Ich bin spam

Sun 11 November 2012

Automatisiert Refleaks in Unittests erkennen

Wenn man händisch CPython-Erweiterungen mit Hilfe der C-API baut (und nicht etwa Cython oder Ähnliches benutzt), läuft man leider sehr leicht Gefahr, dass man versehentlich Reference Leaks einbaut.

Um das Auffinden von Refleaks zu erleichtern, zählt CPython bei einem Debug-Build die Summe aller Referenzen:

Python 3.4.0a0 (default:9214f8440c44, Nov 11 2012, 00:15:25)
[GCC 4.7.1] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
[60488 refs]
>>> n = 42
[60490 refs]

Die Summe der Referenzen ist die Zahl zwischen den []. In diesem Beispiel hat sich die Gesamtanzahl der Referenzen um zwei erhöht, weil die Zahl 42 an zwei Namen gebunden wurde: einmal an n und einmal an _, weil Python im interaktiven Modus immer unter _ eine Referenz auf den letzten Wert hält.

Die Gesamtanzahl der Referenzen kann man in Debug-Builds auch programmatisch über die Funktion sys.gettotalrefcount erhalten. Also liegt es nahe, dass man versucht, Refleaks automatisch zu finden, beispielsweise, indem man die Gesamtanzahl der Referenzen vor und nach einem Testlauf betrachtet.

Die Idee ist hierbei recht einfach: Wird die Test-Suite mit einem Debug-Python ausgeführt, wird jeder Test fünf mal ausgeführt. Bei den letzten zwei Durchläufen wird dann die Gesamtzahl der Referenzen vor und nach dem Test verglichen. Die drei Durchläufe vorher werden nicht ausgewertet, damit sich die Zahl der Referenzen erst auf einen Wert einpendeln kann (beispielsweise, wenn Caches im Test eine Rolle spielen etc.).

Im Folgenden eine simple Umsetzung der Idee für das klassische unittest-Modul:

hunt_leaks = hasattr(sys, "gettotalrefcount")
if hunt_leaks:
    import gc

def _is_not_suite(test):
    try:
        iter(test)
    except TypeError:
        return True
    return False


def _cleanup():
    sys._clear_type_cache()
    gc.collect()

def _hunt(test):
    def test_wrapper(*args, **kwargs):
        deltas = []
        _cleanup()
        for i in xrange(5):
            before = sys.gettotalrefcount()
            test(*args, **kwargs)
            _cleanup()
            after = sys.gettotalrefcount()
            if i > 2:
                deltas.append(after - before)
        if any(deltas):
            print("{0!r} leaks: {1}".format(test, deltas))
    return test_wrapper

class TestSuite(unittest.TestSuite):
    def __iter__(self):
        for test in super(TestSuite, self).__iter__():
            if hunt_leaks and _is_not_suite(test):
                yield _hunt(test)
            else:
                yield test

Anzumerken ist hierbei, dass bei umfangreichen Testsuites _cleanup noch erweitert werden muss. Beispielsweise muss man die ABC-Registry aufräumen, Caches (re, struct, urllib, linecache) und warnings müssen aufgeräumt werden, etc.

Jetzt muss man nur noch unittest beibringen, dass es die obige TestSuite benutzen soll. Benutzt man keinen besonderen Test-Runner, kann man das beispielsweise einfach wie folgt tun:

def test():
    loader = unittest.TestLoader()
    loader.suiteClass = TestSuite
    unittest.main(testLoader=loader)

if __name__ == "__main__":
    test()

Reference Leaks können natürlich auch mit "reinem" Python-Code passieren, indem man globalen Zustand verändert. Ein etwas konstruiertes Beispiel:

class Observable(object):
    def __init__(self):
        self.observers = []

    def add_observer(self, callable):
        self.observers.append(callable)

    def notify_observers(self, value):
        for observer in self.observers:
            observer(value)

value = Observable()

class SpamTest(unittest.TestCase):
    def test_observable(self):
        class Observer(object):
            def __init__(self):
                self.called = False

            def __call__(self, value):
                self.called = True

        observer = Observer()
        value.add_observer(observer)
        value.notify_observers(42)
        self.assertTrue(observer.called)

Führt man die Tests jetzt aus, führt das zu folgender Ausgabe:

.....<__main__.SpamTest testMethod=test_observable> leaks: [40, 40]

----------------------------------------------------------------------
Ran 5 tests in 0.007s

Jetzt weiß man zumindest, dass der Test test_observable leckt. Was genau leckt, muss man aber immer noch selbst herausfinden. Was nicht unbedingt immer leicht und offensichtlich ist.

Tue 11 January 2011

nonlocal in Python 2.x

In Python 3 gibt es ein neues Statement nonlocal, das es erlaubt, bei verschachtelten Funktionen in der inneren Funktion einen neuen Wert an einen lokalen Namen der äußeren Funktion zu binden. In Python 2 war es nur möglich, sich auf einen lokalen Namen der äußeren Funktion zu beziehen.

Note

Der größte Teil dieses Blogeintrags dürften Implementierungsdetails von CPython sein und müssen keinesfall für andere Python-Implementierungen gelten.

Dazu schauen wir uns als erstes eine Funktion näher an:

def f():
    local_name = 42

(In Python implementierte) Funktionien haben in CPython ein sogenanntes Code-Objekt, das als func_code-Attribut (in Python 3 als __code__-Attribute) an die Funktion gebunden ist. In ihm sind der Bytecode, alle verwendeten lokalen Namen und Konstanten und andere Informationen gespeichert. All das ist direkt in Python selbst benutzbar und sollte durchaus einmal im interaktiven Interpreter ausprobiert werden. Der generierte Bytecode für diese Funktion sieht wie folgt aus (die Ausgabe wurde mit dem dis-Modul von Python gemacht):

2           0 LOAD_CONST               1 (42)
            3 STORE_FAST               0 (local_name)
            6 LOAD_CONST               0 (None)
            9 RETURN_VALUE

Dabei ist die 2 in der ersten Spalte die Zeilennummer, die Spalte rechts davon ist der Offset des Opcodes im Bytecode, gefolgt vom Opcode selbst und (falls vorhanden) einem Argument. In Klammern steht dann jeweils, was das Argument bedeutet.

Im konkreten Fall für die Funktion f wird also zunächst die Konstante 1 geladen, in unserem Fall ist das 42. 42 ist hierbei im co_consts-Attribut des Code-Objekts gespeichert. Diese 42 wird dann auf einem internen Stack der CPython-VM abgelegt. Der nächste Opcode, STORE_FAST, speichert den obersten Wert vom Stack in die lokale Variable local_name. Lokale Variablen werden dabei einfach als Array im aktuellen Frame umgesetzt (wobei dieses Array nicht von Python aus erreichbar ist). Der Opcode speichert also einfach den Wert als ersten Eintrag im Array. Falls jemand auf den lokalen Namensraum als Dict zugreifen wird (etwa über locals() oder das f_locals-Attribut eines Frames), wird das Dict des lokalen Namenraumes dann einfach mit den Werten aus dem Array aktualisiert. Die benötigten Informationen, nämlich die Namen der lokalen Variablen, finden sich im co_varnames-Attribut des Code-Objekts.

Als nächstes wird dann die Konstante None geladen und zurückgegeben (da eine Funktion in Python ja implizit None zurückgibt, wenn kein anderer Rückgabewert angebeben wurde).

Als nächstes betrachten wir eine Funktion, die eine geschachtelte Funktion zurückgibt, die einen Namen der äußeren Funktion benutzt:

def outer():
    outer_name = 42

    def inner():
        return outer_name
    return inner

Der Bytecode für outer():

2           0 LOAD_CONST               1 (42)
            3 STORE_DEREF              0 (outer_name)

4           6 LOAD_CLOSURE             0 (outer_name)
            9 BUILD_TUPLE              1
           12 LOAD_CONST               2 (<code object inner at 0x7ff7fe792cd8, file "<stdin>", line 4>)
           15 MAKE_CLOSURE             0
           18 STORE_FAST               0 (inner)

6          21 LOAD_FAST                0 (inner)
           24 RETURN_VALUE

Und für inner():

5           0 LOAD_DEREF               0 (outer_name)
            3 RETURN_VALUE

Was hat sich geändert? Zunächst einmal fällt auf, dass in outer() nicht mehr STORE_FAST zum Speichern der lokalen Variable benutzt wird, sondern STORE_DEREF. Das liegt daran, dass outer_name von der inneren Funktion benutzt wird. Anstatt in einem Array wird der Wert jetzt in einem sogenannten Cell-Objekt gespeicher (wobei diese Cell-Objekte wiederum auch in einem Array im Frame gespeichert werden). outer_name ist auch nicht mehr in co_varnames aufgelistet, sondern in co_cellvars. Als nächstes wird dieses Cell-Objekt dann auf den Stack geladen (mit LOAD_CLOSURE) und in ein Tupel gepackt (BUILD_TUPLE). Dieses Tupel bildet dann das func_closure-Attribut der neuen Funktion inner(), die mit MAKE_CLOSURE erstellt wird. Schließlich wird die neu erstellte Funktion, die sich oben auf dem Stack befindet, mit STORE_FAST an den lokalen Namen inner gebunden, wieder geladen und zurückgegeben.

In inner() wird einfach der Wert aus dem Cell-Objekt geladen, das aus func_closure genommen wird und zurückgegeben. Genau genommen werden die Cell-Objekte aus func_closure zum Array der Cell-Objekte im Frame hinzugefügt, wenn das Frame erstellt wird.

Was passiert jetzt, wenn man in inner() outer_name einen Wert zuweist?

def outer():
    outer_name = 42

    def inner():
        outer_name = 43

    return inner
5           0 LOAD_CONST               1 (43)
            3 STORE_FAST               0 (outer_name)
            6 LOAD_CONST               0 (None)
            9 RETURN_VALUE

Wie man sieht, wird outer_name in inner() automatisch zu einer lokalen Variable, der Wert in outer() selbst wird nicht geändert. Versucht man vorher außerdem auf outer_name zuzugreifen, erhält man einen UnboundLocalError.

Für diesen Fall bietet Python 3 das neue nonlocal-Statement. Damit kann man in inner() sagen, dass man den Wert in outer() ändern mag:

def outer():
    outer_name = 42

    def getter():
        return outer_name

    def increaser(n):
        nonlocal outer_name
        outer_name += n

    return (getter, increaser)

(getter, increaser) = outer()
print(getter())
increaser(42)
print(getter())

Führt man den Code aus, erhält man die folgende Ausgabe:

42
84

Doch wie funktioniert nonlocal? Schauen wir uns dazu wieder den Bytecode an. Von getter():

5           0 LOAD_DEREF               0 (outer_name)
            3 RETURN_VALUE

Wenig überraschend hat sich hier nichts geändert. Und der Bytecode von increaser()?

9           0 LOAD_DEREF               0 (outer_name)
            3 LOAD_FAST                0 (n)
            6 INPLACE_ADD
            7 STORE_DEREF              0 (outer_name)
           10 LOAD_CONST               0 (None)
           13 RETURN_VALUE

Wie man sieht, wird zum Laden der Variable wieder LOAD_DEREF benutzt. Was jetzt aber interessant ist: Zum Speichern wird STORE_DEREF benutzt, also derselbe Opcode, der auch in outer() benutzt wird. Was ja auch eigentlich logisch ist, denn in beiden Fällen wird der Wert in ein Cell-Objekt geschrieben -- sogar in dasselbe Cell-Objekt. Das bringt uns also zur folgenden Erkenntnis: Das nonlocal-Statement wäre auch problemlos in Python 2.x möglich, es fehlt nur die Syntax dafür (und natürlich auch die Compiler-Unterstützung).

Also werden wir im Folgenden probieren, das nonlocal-Statement in Python 2.x umzusetzen. Dazu müssen in der inneren Funktion alle STORE_FAST-Opcodes für einen nonlocal-Namen durch STORE_DEREF ersetzt werden und alle LOAD_FAST durch LOAD_DEREF. Außerdem müssen die Namen dann zu co_freevars hinzugefügt werden. Man kann ein Code-Objekt jedoch nicht einfach so verändern, sondern muss ein neues erstellen. Da man Funktionen verändert, drängt sich ein Dekorator geradezu auf:

def nonlocal(*args):
    def decorator(f):
        code = Code.from_code(f.func_code)
        code.freevars.extend(args)
        for (i, (op, arg)) in enumerate(code.code):
            if op in ["LOAD_FAST", "STORE_FAST"]:
                name = code.code_obj.co_varnames[arg]
                if name in args:
                    if op == "LOAD_FAST":
                        code.code[i] = ("LOAD_DEREF", code.freevars.index(name))
                    else:
                        code.code[i] = ("STORE_DEREF", code.freevars.index(name))

        caller_locals = sys._getframe(1).f_locals
        return types.FunctionType(
            code.to_code(),
            f.func_globals,
            f.func_name,
            f.func_defaults,
            tuple(caller_locals["_[%s]" % (name, )] for name in args)
        )
    return decorator

Dabei zeigt sich ein weiteres Problem: Man muss an die Cell-Objekte der äußeren Funktion bekommen. Dies tun wir, indem wir nach jedem STORE_DEREF (also jedes mal, wenn ein Wert in ein Cell-Objekt geschrieben wird), zwei weitere Opcodes einfügen: LOAD_CLOSURE und STORE_FAST. Damit wird direkt nach dem Speichern das Cell-Objekt auf den Stack geladen und in einem lokalen Namen gespeichert. Dazu führen wir für jeden Namen, der ein Cell-Objekt hat, eine lokale Variable _[<Name>] ein. Die [] deshalb, dass der Name nicht mit einem anderen lokalen Namen zu Konflikten führt. Im Dekorator der inneren Funktion kann man dann einfach über f_locals vom Frame des Aufrufers auf die Cell-Objekte zugreifen, den man mit sys_getframe(1) bekommt. Der Dekorator für die äußere Funktion sieht also so aus:

def outer(*args):
    def decorator(f):
        code = Code.from_code(f.func_code)
        code.varnames.extend("_[%s]" % (name, ) for name in args)
        code.nlocals += len(args)
        code.cellvars.extend(args)

        i = 0
        while i < len(code.code):
            (op, arg) = code.code[i]
            if op == "STORE_DEREF":
                if arg < len(code.cellvars) and code.cellvars[arg] in args:
                    name = code.cellvars[arg]
                    code.code[i+1:i+1] = [
                        ("LOAD_CLOSURE", arg),
                        ("STORE_FAST", code.varnames.index("_[%s]" % (name, )))
                    ]
                    i += 2
            i += 1

        f.func_code = code.to_code()
        return f
    return decorator

Und die Dekoratoren können dann wie folgt benutzt werden:

import nonlocal

@nonlocal.outer("a")
def spam():
    a = 23

    def getter():
        return a

    @nonlocal.nonlocal("a")
    def increaser(n):
        a += n

    return (getter, increaser)

g, i = spam()
assert g() == 23
i(19)
assert g() == 42

Dabei ist anzumerken, dass dabei nicht alle Fälle abgedeckt sind und außerdem davon ausgegangen wird, dass die äußere Funktion bereits Cell-Objekte für die nonlocal-Namen benutzt (in diesem Fall wird dies ausgelöst durch die getter()-Funktion).

Außerdem wird Code-Klasse von Aaron Gallagher benutzt, die bereits aus einem vorigen Blogeintrag bekannt ist:

class Code(object):
    @classmethod
    def from_code(cls, code_obj):
        self = cls()
        self.code_obj = code_obj
        self.cellvars = list(code_obj.co_cellvars)
        self.freevars = list(code_obj.co_freevars)
        self.names = list(code_obj.co_names)
        self.nlocals = code_obj.co_nlocals
        self.varnames = list(code_obj.co_varnames)
        self.consts = list(code_obj.co_consts)
        ret = []
        line_starts = dict(dis.findlinestarts(code_obj))
        code = code_obj.co_code
        labels = dict((addr, Label()) for addr in dis.findlabels(code))
        i, l = 0, len(code)
        extended_arg = 0
        while i < l:
            op = ord(code[i])
            if i in labels:
                ret.append(('MARK_LABEL', labels[i]))
            if i in line_starts:
                ret.append(('MARK_LINENO', line_starts[i]))
            i += 1
            if op >= opcode.HAVE_ARGUMENT:
                arg, = short.unpack(code[i:i + 2])
                arg += extended_arg
                extended_arg = 0
                i += 2
                if op == opcode.EXTENDED_ARG:
                    extended_arg = arg << 16
                    continue
                elif op in opcode.hasjabs:
                    arg = labels[arg]
                elif op in opcode.hasjrel:
                    arg = labels[i + arg]
            else:
                arg = None
            ret.append((opcode.opname[op], arg))
        self.code = ret
        return self

    def to_code(self):
        code_obj = self.code_obj
        co_code = array.array('B')
        co_lnotab = array.array('B')
        label_pos = {}
        jumps = []
        lastlineno = code_obj.co_firstlineno
        lastlinepos = 0
        for op, arg in self.code:
            if op == 'MARK_LABEL':
                label_pos[arg] = len(co_code)
            elif op == 'MARK_LINENO':
                incr_lineno = arg - lastlineno
                incr_pos = len(co_code) - lastlinepos
                lastlineno = arg
                lastlinepos = len(co_code)

                if incr_lineno == 0 and incr_pos == 0:
                    co_lnotab.append(0)
                    co_lnotab.append(0)
                else:
                    while incr_pos > 255:
                        co_lnotab.append(255)
                        co_lnotab.append(0)
                        incr_pos -= 255
                    while incr_lineno > 255:
                        co_lnotab.append(incr_pos)
                        co_lnotab.append(255)
                        incr_pos = 0
                        incr_lineno -= 255
                    if incr_pos or incr_lineno:
                        co_lnotab.append(incr_pos)
                        co_lnotab.append(incr_lineno)
            elif arg is not None:
                op = opcode.opmap[op]
                if op in opcode.hasjabs or op in opcode.hasjrel:
                    jumps.append((len(co_code), arg))
                    arg = 0
                if arg > 0xffff:
                    co_code.extend((opcode.EXTENDED_ARG,
                        (arg >> 16) & 0xff, (arg >> 24) & 0xff))
                co_code.extend((op,
                    arg & 0xff, (arg >> 8) & 0xff))
            else:
                co_code.append(opcode.opmap[op])

        for pos, label in jumps:
            jump = label_pos[label]
            if co_code[pos] in opcode.hasjrel:
                jump -= pos + 3
            assert jump <= 0xffff
            co_code[pos + 1] = jump & 0xff
            co_code[pos + 2] = (jump >> 8) & 0xff

        return types.CodeType(code_obj.co_argcount, self.nlocals,
            code_obj.co_stacksize, code_obj.co_flags, co_code.tostring(),
            tuple(self.consts), tuple(self.names), tuple(self.varnames),
            code_obj.co_filename, code_obj.co_name, code_obj.co_firstlineno,
            co_lnotab.tostring(), tuple(self.freevars), tuple(self.cellvars))

Wed 06 October 2010

Variable Tab-Breite in Python

In Python 2 kann man dem Tokenizer mittels Kommentaren mitteilen, wie breit ein Tab ist:

# :ts=4
if 1:
    if 0:
        print 1
     print 2 # hard tab here

Führt man den Code aus, wird 2 ausgegeben. Entfernt man den Kommentar, wird nichts ausgegeben.

Sat 14 November 2009

Getter und Setter mit Dekoratoren unter Python 2.6

class Spam(object):
    def __init__(self, value):
        self.value = value

    @property
    def spam(self):
        return self.value

    @spam.setter
    def spam(self, value):
        self.value = value

spam = Spam(42)
print spam.spam
spam.spam = 23
print spam.spam

Wed 30 September 2009

marshal-Spaß

>>> import marshal
>>> marshal.loads('l\x02\x00\x00\x00\x00\x00\x00\x00')
00000L
>>> bool(_)
True
>>>

Wed 23 September 2009

Ausgabe in der Python-REPL abschneiden

Dazu benutzt man einfach sys.displayhook. bpython und Python führen beim Starten die Datei aus, auf die die Umgebungsvariable PYTHONSTARTUP zeigt. Man kann sich also einfach eine Datei mit folgendem Inhalt anlegen:

import __builtin__
import sys

def displayhook(value):
    if value is not None:
        __builtin__._ = value
        out = repr(value)
        if len(out) > 42:
            out = out[:42] + '... (truncated)'
        print out
sys.displayhook = displayhook

Dann wird die Ausgabe automatisch abgeschnitten. Mag man dann die eigentliche Ausgabe, kann man print _ bzw. print repr(_) benutzen.

Sat 19 September 2009

eval() in Python ist nicht sicher

Häufig findet man Leute, die gerne mathematische Ausdrücke in Python evaluieren wollen. Dabei kommen sie auf die Idee, dass man dazu ja eval() benutzen könnte. Und damit das ganze auch noch sicher ist, übergibt man eigene globals und locals, in den man optimalerweise __builtins__ auf None setzt, da dann CPythons restricted mode aktiviert wird, ein Überbleibsel aus vergangenen Zeiten. Im restricted mode darf man auf besstimme Attribute von Funktionsobjekten und Klassen nicht zugreifen, wie etwa __defaults__, womit ein trügerisches Gefühl von Sicherheit gegeben wird.

Warum ist das nicht sicher? Zunächst einmal hat man irgendwann gemerkt, dass man in CPython einfach keine sichere Sandbox hinbekommt, weshalb man diesen Mode nicht mehr so wirklich pflegt. Seit Python 2.6 haben Generatoren ein gi_code-Attribut, wodurch man letztlich auch eigene Code-Objekte und damit beliebige Funktionen erstellen kann. Das führt uns also zu folgendem Code:

ns = dict(__builtins__=None)

print eval("""(lambda d={}, t=(1).__class__.__class__:
                (t(lambda: 23)(
                    t((_ for _ in []).gi_code)(
                        0, 1, 4, 67,"""
                        # SETUP_EXCEPT
                        r"""'y\x0c\x00'"""
                        # LOAD_CONST 1
                        r"""'d\x01\x00'"""
                        # LOAD_CONST 0
                        r"""'d\x02\x00'"""
                        # BINARY_DIVIDE
                        r"""'\x15'"""
                        # POP_TOP
                        r"""'\x01'"""
                        # POP_BLOCK
                        r"""'W'"""
                        # JUMP FORWARD 9
                        r"""'n\x09\x00'"""
                        # POP_TOP
                        r"""'\x01'"""
                        # POP_TOP
                        r"""'\x01'"""
                        # Now the traceback object is on TOS
                        # STORE_GLOBAL tb
                        r"""'a\x00\x00'"""
                        # JUMP_FORWARD 1
                        r"""'n\x01\x00'"""
                        # END_FINALLY
                        r"""'X'"""
                        # LOAD_CONST None
                        r"""'d\x00\x00'"""
                        # RETURN_VALUE
                        """'S',
                        (None, 1, 0), ('tb', ), (),
                        'evil.py', 'evil', 1, ''
                    ), d, None, ()
                )(),d['tb'].tb_frame.f_back.f_back.f_back.f_globals)[1])()""",
                ns, ns)

Führt man das aus, fällt einem auf, dass man damit an das richtige __builtins__-Objekt kommt, und damit dann auch beispielsweise __import__ aufrufen kann. Wie funktioniert das aber? Zunächst einmal holt man sich das Builtin type mit (1).__class__.__class__ und speichert es sich in t (indem man es als Default-Argument eines lambdas benutzt und den eigentlichen Code im Körper des lambdas schreibt und das lambda dann aufruft). Damit erstellt man dann ein neues Code-Objekt, das man durch t((_ for _ in []).gi_code) bekommt und damit dann ein neues Funktionsobjekt, das man mit t(lambda: 23) bekommt. Dieses Funktionsobjekt wird dann ausgeführt.

Was macht das konstruierte Funktionsobjekt beim Ausführen? Dazu schaut man sich die folgende Funktion an:

def throws():
    try:
        1 / 0
    except:
        pass

Der Bytecode der Funktion sieht dabei wie folgt aus:

3           0 SETUP_EXCEPT            12 (to 15)

4           3 LOAD_CONST               1 (1)
            6 LOAD_CONST               2 (0)
            9 BINARY_DIVIDE
           10 POP_TOP
           11 POP_BLOCK
           12 JUMP_FORWARD             7 (to 22)

5     >>   15 POP_TOP
           16 POP_TOP
           17 POP_TOP

6          18 JUMP_FORWARD             1 (to 22)
           21 END_FINALLY
      >>   22 LOAD_CONST               0 (None)
           25 RETURN_VALUE

Man beachte die drei POP_TOPs im Mittelteil (der den except-Block darstellt): CPython speichert bei einer Ausnahme den Typ der Ausnahme, die Ausnahme selbst und ein Traceback-Objekt auf dem Stack (das, was sys.exc_info() zurückliefert). Und bekanntlich kommt man mit Traceback-Objekten an ein Frame-Objekt, und mit Frame-Objekten kann man sich den Stack hochhangeln und kommt somit an die globals des Aufrufers, in der man die richtigen __builtins__ findet. Ergo konstruiert man sich einfach eine Funktion, die das Traceback-Objekt eben in einem globalen Namen speichert anstatt es einfach vom Stack verschwinden zu lassen.

Mon 24 August 2009

Wegoptimierte Syntaxfehler

Manche Syntaxfehler können in CPython wegoptimiert werden:

>>> if 0:
...     yield
...
>>> if 1:
...     yield
  File "<input>", line 2
SyntaxError: 'yield' outside function (<input>, line 2)

Fri 03 April 2009

tail call optimization in CPython

CPython unterstützt ja bekanntermaßen keine TCO von sich aus, weshalb es zahlreiche mehr oder weniger schöne Hacks gibt, die das CPython beibringen. Aber einen wirklich beeindruckenden habe ich eben in #python auf Freenode gesehen, geschrieben von habnabit. Und zwar wird der Bytecode geändert, CALL_FUNCTION wird zu JUMP_ABSOLUTE geändert.

Nachtrag: Das müsste eigentlich recht zuverlässig funktionieren, unter verschiedenen CPython-Versionen. Für das CALL_FUNCTION müssen die Argumente eh auf dem Stack liegen. Diese werden jetzt einfach mit STORE_FAST in die Namen geschrieben, die die Funktion entgegen nimmt, und dann wird wieder zum Anfang der Funktion gesprungen.

import inspect, pprint, types, dis, struct, opcode, array
short = struct.Struct('<H')

class Label(object):
    pass

class Code(object):
    @classmethod
    def from_code(cls, code_obj):
        self = cls()
        self.code_obj = code_obj
        self.names = list(code_obj.co_names)
        self.varnames = list(code_obj.co_varnames)
        self.consts = list(code_obj.co_consts)
        ret = []
        line_starts = dict(dis.findlinestarts(code_obj))
        code = code_obj.co_code
        labels = dict((addr, Label()) for addr in dis.findlabels(code))
        i, l = 0, len(code)
        extended_arg = 0
        while i < l:
            op = ord(code[i])
            if i in labels:
                ret.append(('MARK_LABEL', labels[i]))
            if i in line_starts:
                ret.append(('MARK_LINENO', line_starts[i]))
            i += 1
            if op >= opcode.HAVE_ARGUMENT:
                arg, = short.unpack(code[i:i + 2])
                arg += extended_arg
                extended_arg = 0
                i += 2
                if op == opcode.EXTENDED_ARG:
                    extended_arg = arg << 16
                    continue
                elif op in opcode.hasjabs:
                    arg = labels[arg]
                elif op in opcode.hasjrel:
                    arg = labels[i + arg]
            else:
                arg = None
            ret.append((opcode.opname[op], arg))
        self.code = ret
        return self

    def to_code(self):
        code_obj = self.code_obj
        co_code = array.array('B')
        co_lnotab = array.array('B')
        label_pos = {}
        jumps = []
        lastlineno = code_obj.co_firstlineno
        lastlinepos = 0
        for op, arg in self.code:
            if op == 'MARK_LABEL':
                label_pos[arg] = len(co_code)
            elif op == 'MARK_LINENO':
                incr_lineno = arg - lastlineno
                incr_pos = len(co_code) - lastlinepos
                lastlineno = arg
                lastlinepos = len(co_code)

                if incr_lineno == 0 and incr_pos == 0:
                    co_lnotab.append(0)
                    co_lnotab.append(0)
                else:
                    while incr_pos > 255:
                        co_lnotab.append(255)
                        co_lnotab.append(0)
                        incr_pos -= 255
                    while incr_lineno > 255:
                        co_lnotab.append(incr_pos)
                        co_lnotab.append(255)
                        incr_pos = 0
                        incr_lineno -= 255
                    if incr_pos or incr_lineno:
                        co_lnotab.append(incr_pos)
                        co_lnotab.append(incr_lineno)
            elif arg is not None:
                op = opcode.opmap[op]
                if op in opcode.hasjabs or op in opcode.hasjrel:
                    jumps.append((len(co_code), arg))
                    arg = 0
                if arg > 0xffff:
                    co_code.extend((opcode.EXTENDED_ARG,
                        (arg >> 16) & 0xff, (arg >> 24) & 0xff))
                co_code.extend((op,
                    arg & 0xff, (arg >> 8) & 0xff))
            else:
                co_code.append(opcode.opmap[op])

        for pos, label in jumps:
            jump = label_pos[label]
            if co_code[pos] in opcode.hasjrel:
                jump -= pos + 3
            assert jump <= 0xffff
            co_code[pos + 1] = jump & 0xff
            co_code[pos + 2] = (jump >> 8) & 0xff

        return types.CodeType(code_obj.co_argcount, code_obj.co_nlocals,
            code_obj.co_stacksize, code_obj.co_flags, co_code.tostring(),
            tuple(self.consts), tuple(self.names), tuple(self.varnames),
            code_obj.co_filename, code_obj.co_name, code_obj.co_firstlineno,
            co_lnotab.tostring(), code_obj.co_freevars, code_obj.co_cellvars)

    def const_idx(self, val):
        try:
            return self.consts.index(val)
        except ValueError:
            self.consts.append(val)
            return len(self.consts) - 1

def tail_call(func):
    code = Code.from_code(func.func_code)
    func_name = func.__name__
    if func_name in code.varnames:
        raise SyntaxError('"%s" was found as a local variable in the function' %
            func_name)
    try:
        name_idx = code.names.index(func_name)
    except IndexError:
        raise SyntaxError('"%s" not found in function\'s global names' %
            func_name)
    last_idx = 0
    func_start = Label()
    code.code.insert(0, ('MARK_LABEL', func_start))
    while True:
        try:
            lglobal_idx = code.code.index(('LOAD_GLOBAL', name_idx), last_idx)
        except ValueError:
            break

        if code.code[lglobal_idx - 1][0] != 'MARK_LINENO':
            last_idx = lglobal_idx + 1
            continue

        try:
            return_idx = code.code.index(('RETURN_VALUE', None), lglobal_idx)
        except ValueError:
            raise SyntaxError('"return" not found in function after "%s"' %
                func_name)

        if (return_idx != len(code.code) - 1
                and code.code[return_idx + 1][0] != 'MARK_LINENO'):
            last_idx = return_idx + 1
            continue

        if code.code[return_idx - 1][0] in ('CALL_FUNCTION_VAR',
                'CALL_FUNCTION_KW', 'CALL_FUNCTION_VAR_KW'):
            raise SyntaxError('calling with *a and/or **kw is unsupported')

        if code.code[return_idx - 1][0] != 'CALL_FUNCTION':
            last_idx = return_idx + 1
            continue

        if code.code[return_idx - 1][1] & 0xff00:
            raise SyntaxError('calling with keyword arguments is unsupported')

        arg_names, _, _, defaults = inspect.getargspec(func)
        n_args = code.code[return_idx - 1][1]
        if defaults is None:
            defaults = ()
        if n_args + len(defaults) < len(arg_names):
            raise SyntaxError('not enough arguments provided')

        new_bytecode = []
        if n_args < len(arg_names):
            new_bytecode.extend(
                ('LOAD_CONST', code.const_idx(d))
                for d in defaults[n_args - len(arg_names):])
        new_bytecode.extend(
            ('STORE_FAST', code.varnames.index(arg))
            for arg in reversed(arg_names))
        new_bytecode.append(('JUMP_ABSOLUTE', func_start))
        code.code[return_idx - 1:return_idx + 1] = new_bytecode
        del code.code[lglobal_idx]

    func.func_code = code.to_code()
    return func

def factorial(n, acc=1):
    if n <= 0:
        return acc
    return factorial(n - 1, n * acc)

dis.dis(factorial)
factorial = tail_call(factorial)
print
dis.dis(factorial)

print factorial(10000)