пятница, 5 декабря 2014 г.

PGQ очередь с одним consumer и producer

Установим pgq:
$ sudo pip install pgqueue

Установим pgq-расширение для PostgreSQL:
$ sudo apt-get install postgresql-9.2-pgq3

После этого нужно через консоль подсоединиться к БД и выполнить команду на создание расширения pgq:
my_db=# CREATE EXTENSION IF NOT EXISTS pgq;

Теперь нужно запустить процесс ticker. Если это не сделать, то события будут добавляться в очередь, но не будут отправляться консьюмерам, что приведет к быстрому росту очереди.

Запустим простую питоновскую реализацию ticker:
python -m pgqueue 'host=127.0.0.1 port=5432 user=postgres password=123 dbname=johny'

Создадим скрипт consumer.py:
# encoding: utf-8
import psycopg2
import pgqueue

conn = psycopg2.connect("dbname=johny user=postgres password=123 host=localhost")
conn.autocommit = True
cursor = conn.cursor()

consum_q = pgqueue.Consumer('first_queue', 'consumer_one')
consum_q.register(cursor)

# Получаем события из очереди
conn.autocommit = False
for event in consum_q.next_events(cursor, commit=True):
    print(event)


При запуске этого скрипта события буду забираться из очереди. Но там пока нет событий. Чтобы они там появились, создадим файл publisher.py:
# encoding: utf-8
import psycopg2
import pgqueue

conn = psycopg2.connect("dbname=johny user=postgres password=123 host=localhost")
conn.autocommit = True
cursor = conn.cursor()

# Инициируем очередь
first_q = pgqueue.Queue('first_queue')
first_q.create(cursor, ticker_max_lag='4 seconds')

# Добавляем события в очередь
first_q.insert_event(cursor, 'greeting', 'How are you?')
first_q.insert_event(cursor, 'question', 'What are you doing?')

Чтобы события появились в очереди, запустим скрипт publisher.py:
$ python publisher.py

Теперь попробуем прочитать эти события из очереди, запустив consumer.py:
$ python consumer.py  

Если ответ не пришел, нужно убедиться, что процесс ticker запущен.