| import threading |
| import asyncio |
| import hangups |
| import logging |
| from hangups.auth import RefreshTokenCache |
| from hangups.user import UserID |
| |
| class CredentialsPrompt(object): |
| def __init__(self, email, password): |
| self.email = email |
| self.password = password |
| |
| def get_email(self): |
| return self.email |
| |
| def get_password(self): |
| return self.password |
| |
| class HangupsThread(threading.Thread): |
| def __init__(self, xmpp, remote, gmail='', password=''): |
| |
| self.xmpp = xmpp |
| self.remote = remote |
| self.presence_timer = None # filled in on first incoming presence |
| self.presence_timer_lock = threading.Lock() |
| |
| super().__init__(name='HangupsThread-{}'.format(remote)) |
| |
| self.cookie = hangups.auth.get_auth(CredentialsPrompt(gmail, password), RefreshTokenCache('{}-refresh_token.cache'.format(remote))) |
| |
| def run(self): |
| policy = asyncio.get_event_loop_policy() |
| self.loop = policy.new_event_loop() |
| policy.set_event_loop(self.loop) |
| |
| self.client = hangups.Client(self.cookie) |
| self.client.on_connect.add_observer(self._on_connect) |
| self.client.on_disconnect.add_observer(self._on_disconnect) |
| self.client.on_reconnect.add_observer(self._on_reconnect) |
| |
| self.client.on_connect.fire() |
| self.loop.run_until_complete(self.client.connect()) |
| logging.info('Hangups thread stopped for {}.'.format(self.remote)) |
| if self.presence_timer is not None: |
| self.presence_timer.cancel() |
| |
| def _stop(self): |
| yield from self.client.disconnect() |
| self.loop.stop() |
| self.loop = None |
| |
| def stop(self): |
| if self.loop is not None: |
| self.loop.call_soon_threadsafe(asyncio.async, self._stop()) |
| |
| def _presence_timer_run(self): |
| # if we haven't heard from google for a while, re-query all |
| # the presence states just to remind it that we exist |
| # without this, presence seems to get forgotten eventually |
| # by the server |
| logging.info('Presence Timer fired, requerying presence for {}'.format(self.remote)) |
| self.loop.call_soon_threadsafe(asyncio.async, self._on_connect_common()) |
| |
| def presence_timer_reset(self): |
| with self.presence_timer_lock: |
| if self.presence_timer is not None: |
| self.presence_timer.cancel() |
| self.presence_timer = threading.Timer(3600, self._presence_timer_run) |
| self.presence_timer.start() |
| |
| def incoming_presence(self, user, pres): |
| self.presence_timer_reset() |
| mood_message = '' |
| for segment in pres.mood_message.mood_content.segment: |
| mood_message += segment.text |
| self.xmpp.incoming_presence(self.remote, user, pres.reachable, |
| pres.available, mood_message) |
| |
| @asyncio.coroutine |
| def _on_connect_common(self): |
| for user in self.user_list.get_all(): |
| if user.is_self: |
| continue |
| self.xmpp.incoming_user(self.remote, user) |
| pres = yield from self.query_presence(user) |
| self.incoming_presence(user, pres) |
| |
| @asyncio.coroutine |
| def _on_connect(self): |
| logging.info('Connected to google hangouts service for %s', self.remote) |
| self.user_list, self.conv_list = ( |
| yield from hangups.build_user_conversation_list(self.client) |
| ) |
| self.client.on_state_update.add_observer(self._on_state_update) |
| self.conv_list.on_event.add_observer(self._on_event) |
| self.conv_list.on_typing.add_observer(self._on_typing) |
| yield from self._on_connect_common() |
| |
| @asyncio.coroutine |
| def _on_disconnect(self): |
| logging.info('Disconnect from google hangouts for %s', self.remote) |
| self.xmpp.incoming_disconnect(self.remote) |
| |
| @asyncio.coroutine |
| def _on_reconnect(self): |
| logging.info('Reconnect to google hangouts for %s', self.remote) |
| self.user_list, self.conv_list = ( |
| yield from hangups.build_user_conversation_list(self.client) |
| ) |
| yield from self._on_connect_common() |
| |
| @asyncio.coroutine |
| def _on_state_update(self, state_update): |
| """Receive a StateUpdate""" |
| notification_type = state_update.WhichOneof('state_update') |
| if notification_type == 'presence_notification': |
| yield from self._handle_presence_notification(state_update.presence_notification) |
| elif notification_type == 'focus_notification': |
| yield from self._handle_focus_notification(state_update.focus_notification) |
| |
| @asyncio.coroutine |
| def _handle_presence_notification(self, presence_notification): |
| for presence in presence_notification.presence: |
| if not presence.HasField('presence'): |
| continue |
| user = self.user_list.get_user(UserID(chat_id=presence.user_id.chat_id, |
| gaia_id=presence.user_id.gaia_id)) |
| if not user: |
| continue |
| # the notification usually only contains the timestamp, so |
| # ask for the values we want |
| pres = yield from self.query_presence(user) |
| self.incoming_presence(user, pres) |
| |
| @asyncio.coroutine |
| def _handle_focus_notification(self, focus_notification): |
| # focus is telling us something about the client, but google |
| # will helpfully translate it to presence |
| sid = focus_notification.sender_id |
| user = self.user_list.get_user(UserID(chat_id=sid.chat_id, |
| gaia_id=sid.gaia_id)) |
| pres = yield from self.query_presence(user) |
| self.incoming_presence(user, pres) |
| |
| @asyncio.coroutine |
| def query_presence(self, user): |
| req = hangups.hangouts_pb2.QueryPresenceRequest( |
| request_header = self.client.get_request_header(), |
| participant_id=[ |
| hangups.hangouts_pb2.ParticipantId(gaia_id=user.id_.gaia_id), |
| ], |
| field_mask=[ |
| hangups.hangouts_pb2.FIELD_MASK_REACHABLE, |
| hangups.hangouts_pb2.FIELD_MASK_AVAILABLE, |
| hangups.hangouts_pb2.FIELD_MASK_MOOD, |
| hangups.hangouts_pb2.FIELD_MASK_LAST_SEEN, |
| ], |
| ) |
| res = yield from self.client.query_presence(req) |
| if not hasattr(res, 'presence_result'): |
| return None |
| for presence in res.presence_result: |
| return presence.presence |
| |
| def get_conversation_for_user(self, gaia_id): |
| for conv in self.conv_list.get_all(): |
| if conv._conversation.type != hangups.hangouts_pb2.CONVERSATION_TYPE_ONE_TO_ONE: |
| continue |
| for user in conv.users: |
| if user.id_.gaia_id == gaia_id: |
| return conv |
| |
| @asyncio.coroutine |
| def _send_message(self, msg): |
| mto = msg.get_to().local |
| |
| conv = self.get_conversation_for_user(mto) |
| if conv: |
| segments = hangups.ChatMessageSegment.from_str(msg['body']) |
| yield from conv.send_message(segments) |
| else: |
| logging.error("FAILED TO FIND CONV FOR %s", mto) |
| |
| def send_message(self, msg): |
| if self.loop is not None: |
| self.loop.call_soon_threadsafe(asyncio.async, self._send_message(msg)) |
| |
| def _on_event(self, conv_event): |
| conv = self.conv_list.get(conv_event.conversation_id) |
| user = conv.get_user(conv_event.user_id) |
| if isinstance(conv_event, hangups.ChatMessageEvent): |
| # Event is a chat message: foward it to XMPP. |
| if conv._conversation.type == hangups.hangouts_pb2.CONVERSATION_TYPE_ONE_TO_ONE: |
| if not user.is_self: |
| self.xmpp.incoming_message(self.remote, user, conv_event.text) |
| |
| def _on_typing(self, typ): |
| user = self.user_list.get_user(UserID(chat_id=typ.user_id.chat_id, |
| gaia_id=typ.user_id.gaia_id)) |
| if user is None: |
| return |
| typing_states = { |
| hangups.hangouts_pb2.TYPING_TYPE_UNKNOWN: 'gone', |
| hangups.hangouts_pb2.TYPING_TYPE_STARTED: 'composing', |
| hangups.hangouts_pb2.TYPING_TYPE_PAUSED: 'paused', |
| hangups.hangouts_pb2.TYPING_TYPE_STOPPED: 'inactive', |
| } |
| self.xmpp.incoming_typing(self.remote, user, typing_states[typ.status]) |
| |
| def send_typing_notification(self, msg): |
| self.client.set_active() |
| mto = msg.get_to().local |
| conv = self.get_conversation_for_user(mto) |
| if self.loop is not None: |
| self.loop.call_soon_threadsafe(asyncio.async, self._send_typing_notification(msg, conv)) |
| |
| @asyncio.coroutine |
| def _send_typing_notification(self, msg, conv): |
| typ = hangups.hangouts_pb2.TYPING_TYPE_PAUSED |
| if msg['chat_state'] == 'composing': |
| typ = hangups.hangouts_pb2.TYPING_TYPE_STARTED |
| yield from conv.set_typing(typ) |