null

Использование модуля threading в Python

Как и во многих современных языках программирования (даже C++ в новом стандарте обзавелся их поддержкой), стандартная библиотека Python предоставляет средства для многопоточного программирования, и одно из таких средств - модуль threading. Давайте разберемся с его интерфейсом.

Надо отметить, что в CPython присутствует т.н. Global Interpreter Lock (GIL), которая запрещает присутствие нескольких потоков в пространстве интерпретатора в один момент времени, таким образом надо учитывать, что потоки Python не предоставляют параллелизма. Тем не менее они хороши в задачах, связанных с вводом-выводом. В IronPython и Jython GIL отстутсвует, а в CPython для параллельной обработки надо использовать модуль multiprocessing. Также см. тут: http://wiki.python.org/moin/ParallelProcessing


Класс потоков Thread


Все потоки, порождаемые в приложении, представляют из себя классы, наследуемые от класса threading.Thread. Главный метод потока - run() необходимо переопределить, ну и также не забыть вызвать __init__ родительского класса в конструкторе.
    

from threading import Thread

class HelloThread(Thread):
    def __init__(self, name):
        Thread.__init__(self, name=name)
    
    def run(self):
        print 'Hello from %s' % self.name

ht = HelloThread('HelloThread')
ht.start()



Метод start() запускает поток на исполнение. Проверить завершился ли дочерний поток можно вызвав функцию is_alive(), аргумент же name задает имя потока. Текущий исполняемый поток можно определить из функции current_thread().

Другой вариант - экземпляры класса Thread - в этом случае надо передать callable-объект через параметр target:

from threading import Thread, current_thread

def hello():
    print 'Hello from %s' % current_thread().name

ht = Thread(target=hello)
ht.start()
hello()

Примитивы синхронизации в модуле threading

Вместе с потоками, как известно, появляется проблема синхронизации данных, и threading предоставляет соответствующие интерфейсы. Простейшим типом примитива синхронизации является Lock - блокировка, реализующая два метода: acquire() и release(), которые как несложно догадаться захватывают и освобождают блокировку:
    

stdout_lck = Lock()

...

    def run(self):
        stdout_lck.acquire()
        
        print 'Hello from %s' % self.name
        
        stdout_lck.release()


Если в acquire передать аргумент blocking=False, выполняемый поток не будет заблокирован, а acquire вернет True в случае, если захват блокировки был успешным.

Итак, примитивы синхронизации:
    - Lock - блокировка
    - RLock - блокировка с возможностью повторного захвата
    - Condition - условная переменная
    - Semaphore - семафор   

Очереди потоков

Чтобы заблокировать поток на время исполнения дочернего существует метод join(), однако для многопоточной обработки этого мало - обычно необходимо работать с группой потоков. Простейшим способом организации такой группы является очередь Queue из одноименного модуля. Рассмотрим традиционную задачу по пингу большого количества хостов. Добавление в очередь производится с помощью метода put(), а сам поток уведомляет очередь о своем завершении через метод task_done(). Скрипт: multiping.py

Конечно очередь - удобный механизм, но надо заметить, что в этом случае на исполнение запускаются сразу все потоки, а это количество хотелось бы ограничить. Как вариант, можно использовать объекты типа Event, также определенные в модуле threading. Основной интерфейс состоит из двух методов: wait(), блокирующий исполнение потока и set(), устанавливающий флаг и разблокирующий ждущие потоки. На основе реализуем пул потоков:

slot_available = lambda thread: thread == None or thread.is_alive() == False

class ThreadPool:
    MAX_THREADS = 16
    
    def __init__(self):
        self.pool = [None] * self.MAX_THREADS
        self.event = Event()
    
    def is_available_slots(self):
        return any(slot_available(thread) for thread in self.pool)
                
    def dispatch(self, thread):
        if not self.is_available_slots():
            # Если нет свободных слотов
            # ждем пока не завершится один из потоков
            self.event.wait()
            self.event.clear()
        
        # Выбираем первый свободный слот и диспетчеризуем поток
        for slot_id in range(self.MAX_THREADS):
            if slot_available(self.pool[slot_id]):
                self.pool[slot_id] = thread
                thread.start()
                
                break



После этого конечно время выполнения скрипта увеличилось (с 14 секунд до почти 4х минут) :) Полный листинг скрипта представлен здесь: multiping-2.py

Ссылки


http://www.ibm.com/developerworks/aix/library/au-threadingpython/ - статья 'Practical threaded programming with Python'
http://docs.python.org/library/threading.html
 - документация по модулю threading



К списку статей

 

Интересуюсь по большей части системным анализом программного обеспечения: поиском багов и анализом неисправностей, а также системным программированием (и не оставляю надежд запилить свою операционку, хотя нехватка времени сказывается :) ). Программированием увлекаюсь с 12 лет, но так уж получилось, что стал я инженером.

Основная сфера моей деятельности связана с поддержкой Solaris и оборудования Sun/Oracle, хотя в последнее время к ним прибавились технологии виртуализации (линейка Citrix Xen) и всякое разное от IBM - от xSeries до Power. Учусь на кафедре Вычислительной Техники НИУ ИТМО.

See you...out there!

http://www.facebook.com/profile.php?id=100001947776045
https://twitter.com/AnnoyingBugs