blob: ba78d55d1610b26b0b1e6eb174d77185d7f7037a [file] [log] [blame]
import threading
import asyncio
import hangups
import logging
from hangups.auth import RefreshTokenCache
from hangups.user import UserID
class CredentialsPrompt(object):
def __init__(self, email, password):
self.email = email
self.password = password
def get_email(self):
return self.email
def get_password(self):
return self.password
class HangupsThread(threading.Thread):
def __init__(self, xmpp, remote, gmail='', password=''):
self.xmpp = xmpp
self.remote = remote
self.presence_timer = None # filled in on first incoming presence
self.presence_timer_lock = threading.Lock()
super().__init__(name='HangupsThread-{}'.format(remote))
self.cookie = hangups.auth.get_auth(CredentialsPrompt(gmail, password), RefreshTokenCache('{}-refresh_token.cache'.format(remote)))
def run(self):
policy = asyncio.get_event_loop_policy()
self.loop = policy.new_event_loop()
policy.set_event_loop(self.loop)
self.client = hangups.Client(self.cookie)
self.client.on_connect.add_observer(self._on_connect)
self.client.on_disconnect.add_observer(self._on_disconnect)
self.client.on_reconnect.add_observer(self._on_reconnect)
self.client.on_connect.fire()
self.loop.run_until_complete(self.client.connect())
logging.info('Hangups thread stopped for {}.'.format(self.remote))
if self.presence_timer is not None:
self.presence_timer.cancel()
def _stop(self):
yield from self.client.disconnect()
self.loop.stop()
self.loop = None
def stop(self):
if self.loop is not None:
self.loop.call_soon_threadsafe(asyncio.async, self._stop())
def _presence_timer_run(self):
# if we haven't heard from google for a while, re-query all
# the presence states just to remind it that we exist
# without this, presence seems to get forgotten eventually
# by the server
logging.info('Presence Timer fired, requerying presence for {}'.format(self.remote))
self.loop.call_soon_threadsafe(asyncio.async, self._on_connect_common())
def presence_timer_reset(self):
with self.presence_timer_lock:
if self.presence_timer is not None:
self.presence_timer.cancel()
self.presence_timer = threading.Timer(3600, self._presence_timer_run)
self.presence_timer.start()
def incoming_presence(self, user, pres):
self.presence_timer_reset()
mood_message = ''
for segment in pres.mood_message.mood_content.segment:
mood_message += segment.text
self.xmpp.incoming_presence(self.remote, user, pres.reachable,
pres.available, mood_message)
@asyncio.coroutine
def _on_connect_common(self):
for user in self.user_list.get_all():
if user.is_self:
continue
self.xmpp.incoming_user(self.remote, user)
pres = yield from self.query_presence(user)
self.incoming_presence(user, pres)
@asyncio.coroutine
def _on_connect(self):
logging.info('Connected to google hangouts service for %s', self.remote)
self.user_list, self.conv_list = (
yield from hangups.build_user_conversation_list(self.client)
)
self.client.on_state_update.add_observer(self._on_state_update)
self.conv_list.on_event.add_observer(self._on_event)
self.conv_list.on_typing.add_observer(self._on_typing)
yield from self._on_connect_common()
@asyncio.coroutine
def _on_disconnect(self):
logging.info('Disconnect from google hangouts for %s', self.remote)
self.xmpp.incoming_disconnect(self.remote)
@asyncio.coroutine
def _on_reconnect(self):
logging.info('Reconnect to google hangouts for %s', self.remote)
self.user_list, self.conv_list = (
yield from hangups.build_user_conversation_list(self.client)
)
yield from self._on_connect_common()
@asyncio.coroutine
def _on_state_update(self, state_update):
"""Receive a StateUpdate"""
notification_type = state_update.WhichOneof('state_update')
if notification_type == 'presence_notification':
yield from self._handle_presence_notification(state_update.presence_notification)
elif notification_type == 'focus_notification':
yield from self._handle_focus_notification(state_update.focus_notification)
@asyncio.coroutine
def _handle_presence_notification(self, presence_notification):
for presence in presence_notification.presence:
if not presence.HasField('presence'):
continue
user = self.user_list.get_user(UserID(chat_id=presence.user_id.chat_id,
gaia_id=presence.user_id.gaia_id))
if not user:
continue
# the notification usually only contains the timestamp, so
# ask for the values we want
pres = yield from self.query_presence(user)
self.incoming_presence(user, pres)
@asyncio.coroutine
def _handle_focus_notification(self, focus_notification):
# focus is telling us something about the client, but google
# will helpfully translate it to presence
sid = focus_notification.sender_id
user = self.user_list.get_user(UserID(chat_id=sid.chat_id,
gaia_id=sid.gaia_id))
pres = yield from self.query_presence(user)
self.incoming_presence(user, pres)
@asyncio.coroutine
def query_presence(self, user):
req = hangups.hangouts_pb2.QueryPresenceRequest(
request_header = self.client.get_request_header(),
participant_id=[
hangups.hangouts_pb2.ParticipantId(gaia_id=user.id_.gaia_id),
],
field_mask=[
hangups.hangouts_pb2.FIELD_MASK_REACHABLE,
hangups.hangouts_pb2.FIELD_MASK_AVAILABLE,
hangups.hangouts_pb2.FIELD_MASK_MOOD,
hangups.hangouts_pb2.FIELD_MASK_LAST_SEEN,
],
)
res = yield from self.client.query_presence(req)
if not hasattr(res, 'presence_result'):
return None
for presence in res.presence_result:
return presence.presence
def get_conversation_for_user(self, gaia_id):
for conv in self.conv_list.get_all():
if conv._conversation.type != hangups.hangouts_pb2.CONVERSATION_TYPE_ONE_TO_ONE:
continue
for user in conv.users:
if user.id_.gaia_id == gaia_id:
return conv
@asyncio.coroutine
def _send_message(self, msg):
mto = msg.get_to().local
conv = self.get_conversation_for_user(mto)
if conv:
segments = hangups.ChatMessageSegment.from_str(msg['body'])
yield from conv.send_message(segments)
else:
logging.error("FAILED TO FIND CONV FOR %s", mto)
def send_message(self, msg):
if self.loop is not None:
self.loop.call_soon_threadsafe(asyncio.async, self._send_message(msg))
def _on_event(self, conv_event):
conv = self.conv_list.get(conv_event.conversation_id)
user = conv.get_user(conv_event.user_id)
if isinstance(conv_event, hangups.ChatMessageEvent):
# Event is a chat message: foward it to XMPP.
if conv._conversation.type == hangups.hangouts_pb2.CONVERSATION_TYPE_ONE_TO_ONE:
if not user.is_self:
self.xmpp.incoming_message(self.remote, user, conv_event.text)
def _on_typing(self, typ):
user = self.user_list.get_user(UserID(chat_id=typ.user_id.chat_id,
gaia_id=typ.user_id.gaia_id))
if user is None:
return
typing_states = {
hangups.hangouts_pb2.TYPING_TYPE_UNKNOWN: 'gone',
hangups.hangouts_pb2.TYPING_TYPE_STARTED: 'composing',
hangups.hangouts_pb2.TYPING_TYPE_PAUSED: 'paused',
hangups.hangouts_pb2.TYPING_TYPE_STOPPED: 'inactive',
}
self.xmpp.incoming_typing(self.remote, user, typing_states[typ.status])
def send_typing_notification(self, msg):
self.client.set_active()
mto = msg.get_to().local
conv = self.get_conversation_for_user(mto)
if self.loop is not None:
self.loop.call_soon_threadsafe(asyncio.async, self._send_typing_notification(msg, conv))
@asyncio.coroutine
def _send_typing_notification(self, msg, conv):
typ = hangups.hangouts_pb2.TYPING_TYPE_PAUSED
if msg['chat_state'] == 'composing':
typ = hangups.hangouts_pb2.TYPING_TYPE_STARTED
yield from conv.set_typing(typ)