Установим pgq:
$ sudo pip install pgqueue
Установим pgq-расширение для PostgreSQL:
$ sudo apt-get install postgresql-9.2-pgq3
После этого нужно через консоль подсоединиться к БД и выполнить команду на создание расширения pgq:
my_db=# CREATE EXTENSION IF NOT EXISTS pgq;
Теперь нужно запустить процесс ticker. Если это не сделать, то события будут добавляться в очередь, но не будут отправляться консьюмерам, что приведет к быстрому росту очереди.
Запустим простую питоновскую реализацию
ticker:
python -m pgqueue 'host=127.0.0.1 port=5432 user=postgres password=123 dbname=johny'
Создадим скрипт
consumer.py:
# encoding: utf-8
import psycopg2
import pgqueue
conn = psycopg2.connect("dbname=johny user=postgres password=123 host=localhost")
conn.autocommit = True
cursor = conn.cursor()
consum_q = pgqueue.Consumer('first_queue', 'consumer_one')
consum_q.register(cursor)
# Получаем события из очереди
conn.autocommit = False
for event in consum_q.next_events(cursor, commit=True):
print(event)
При запуске этого скрипта события буду забираться из очереди.
Но там пока нет событий. Чтобы они там появились, создадим файл
publisher.py:
# encoding: utf-8
import psycopg2
import pgqueue
conn = psycopg2.connect("dbname=johny user=postgres password=123 host=localhost")
conn.autocommit = True
cursor = conn.cursor()
# Инициируем очередь
first_q = pgqueue.Queue('first_queue')
first_q.create(cursor, ticker_max_lag='4 seconds')
# Добавляем события в очередь
first_q.insert_event(cursor, 'greeting', 'How are you?')
first_q.insert_event(cursor, 'question', 'What are you doing?')
Чтобы события появились в очереди, запустим скрипт
publisher.py:
$ python publisher.py
Теперь попробуем прочитать эти события из очереди, запустив
consumer.py:
$ python consumer.py
Если ответ не пришел, нужно убедиться, что процесс ticker запущен.