import redis
import time
import threading
class RedisListener(object):
def __init__(self, channels: list,
host: str, port: int, db: int):
self.__all_client = []
self.__t = threading.Thread(target=self.__handler,
args=(channels, host, port, db))
self.__t.daemon = True
self.__t.start()
def register(self, client):
self.__all_client.append(client)
def __handler(self, channels: list, host: str, port: int, db: int):
connection = redis.Redis(
connection_pool=redis.ConnectionPool(host=host, port=port, db=db))
pubsub = connection.pubsub()
pubsub.psubscribe(channels)
while True:
msg = pubsub.get_message(timeout=None)
if msg["type"] == "psubscribe":
continue
else:
self.__send(msg["data"].decode("utf8"))
connection.close()
def __send(self, msg: str):
for client in self.__all_client:
try:
client.send(msg)
except Exception:
self.__all_client.remove(client)
class MyClient(object):
def send(self, msg: str):
print(msg)
if __name__ == "__main__":
rl = RedisListener(["*"], "10.60.2.114", 6379, 0)
cli = MyClient()
rl.register(cli)
while True:
time.sleep(0.1)