| // Copyright 2015 The Weave Authors. All rights reserved. | 
 | // Use of this source code is governed by a BSD-style license that can be | 
 | // found in the LICENSE file. | 
 |  | 
 | #include "src/notification/xmpp_channel.h" | 
 |  | 
 | #include <string> | 
 |  | 
 | #include <base/bind.h> | 
 | #include <base/strings/string_number_conversions.h> | 
 | #include <weave/provider/network.h> | 
 | #include <weave/provider/task_runner.h> | 
 |  | 
 | #include "src/backoff_entry.h" | 
 | #include "src/data_encoding.h" | 
 | #include "src/notification/notification_delegate.h" | 
 | #include "src/notification/notification_parser.h" | 
 | #include "src/notification/xml_node.h" | 
 | #include "src/privet/openssl_utils.h" | 
 | #include "src/string_utils.h" | 
 | #include "src/utils.h" | 
 |  | 
 | namespace weave { | 
 |  | 
 | namespace { | 
 |  | 
 | std::string BuildXmppStartStreamCommand() { | 
 |   return "<stream:stream to='clouddevices.gserviceaccount.com' " | 
 |          "xmlns:stream='http://etherx.jabber.org/streams' " | 
 |          "xml:lang='*' version='1.0' xmlns='jabber:client'>"; | 
 | } | 
 |  | 
 | std::string BuildXmppAuthenticateCommand(const std::string& account, | 
 |                                          const std::string& token) { | 
 |   std::vector<uint8_t> credentials; | 
 |   credentials.push_back(0); | 
 |   credentials.insert(credentials.end(), account.begin(), account.end()); | 
 |   credentials.push_back(0); | 
 |   credentials.insert(credentials.end(), token.begin(), token.end()); | 
 |   std::string msg = | 
 |       "<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' " | 
 |       "mechanism='X-OAUTH2' auth:service='oauth2' " | 
 |       "auth:allow-non-google-login='true' " | 
 |       "auth:client-uses-full-bind-result='true' " | 
 |       "xmlns:auth='http://www.google.com/talk/protocol/auth'>" + | 
 |       Base64Encode(credentials) + "</auth>"; | 
 |   return msg; | 
 | } | 
 |  | 
 | // Backoff policy. | 
 | // Note: In order to ensure a minimum of 20 seconds between server errors, | 
 | // we have a 30s +- 10s (33%) jitter initial backoff. | 
 | const BackoffEntry::Policy kDefaultBackoffPolicy = { | 
 |     // Number of initial errors (in sequence) to ignore before applying | 
 |     // exponential back-off rules. | 
 |     0, | 
 |  | 
 |     // Initial delay for exponential back-off in ms. | 
 |     30 * 1000,  // 30 seconds. | 
 |  | 
 |     // Factor by which the waiting time will be multiplied. | 
 |     2, | 
 |  | 
 |     // Fuzzing percentage. ex: 10% will spread requests randomly | 
 |     // between 90%-100% of the calculated time. | 
 |     0.33,  // 33%. | 
 |  | 
 |     // Maximum amount of time we are willing to delay our request in ms. | 
 |     10 * 60 * 1000,  // 10 minutes. | 
 |  | 
 |     // Time to keep an entry from being discarded even when it | 
 |     // has no significant state, -1 to never discard. | 
 |     -1, | 
 |  | 
 |     // Don't use initial delay unless the last request was an error. | 
 |     false, | 
 | }; | 
 |  | 
 | // Used for keeping connection alive. | 
 | const int kRegularPingIntervalSeconds = 60; | 
 | const int kRegularPingTimeoutSeconds = 30; | 
 |  | 
 | // Used for diagnostic when connectivity changed. | 
 | const int kAgressivePingIntervalSeconds = 5; | 
 | const int kAgressivePingTimeoutSeconds = 10; | 
 |  | 
 | const int kConnectingTimeoutAfterNetChangeSeconds = 30; | 
 |  | 
 | }  // namespace | 
 |  | 
 | XmppChannel::XmppChannel(const std::string& account, | 
 |                          const std::string& access_token, | 
 |                          const std::string& xmpp_endpoint, | 
 |                          provider::TaskRunner* task_runner, | 
 |                          provider::Network* network) | 
 |     : account_{account}, | 
 |       access_token_{access_token}, | 
 |       xmpp_endpoint_{xmpp_endpoint}, | 
 |       network_{network}, | 
 |       backoff_entry_{&kDefaultBackoffPolicy}, | 
 |       task_runner_{task_runner}, | 
 |       iq_stanza_handler_{new IqStanzaHandler{this, task_runner}} { | 
 |   read_socket_data_.resize(4096); | 
 |   if (network) { | 
 |     network->AddConnectionChangedCallback(base::Bind( | 
 |         &XmppChannel::OnConnectivityChanged, weak_ptr_factory_.GetWeakPtr())); | 
 |   } | 
 | } | 
 |  | 
 | void XmppChannel::OnMessageRead(size_t size, ErrorPtr error) { | 
 |   read_pending_ = false; | 
 |   if (error) | 
 |     return Restart(); | 
 |   std::string msg(read_socket_data_.data(), size); | 
 |   VLOG(2) << "Received XMPP packet: '" << msg << "'"; | 
 |  | 
 |   if (!size) | 
 |     return Restart(); | 
 |  | 
 |   stream_parser_.ParseData(msg); | 
 |   WaitForMessage(); | 
 | } | 
 |  | 
 | void XmppChannel::OnStreamStart(const std::string& node_name, | 
 |                                 std::map<std::string, std::string> attributes) { | 
 |   VLOG(2) << "XMPP stream start: " << node_name; | 
 | } | 
 |  | 
 | void XmppChannel::OnStreamEnd(const std::string& node_name) { | 
 |   VLOG(2) << "XMPP stream ended: " << node_name; | 
 |   Stop(); | 
 |   if (IsConnected()) { | 
 |     // If we had a fully-established connection, restart it now. | 
 |     // However, if the connection has never been established yet (e.g. | 
 |     // authorization failed), do not restart right now. Wait till we get | 
 |     // new credentials. | 
 |     task_runner_->PostDelayedTask( | 
 |         FROM_HERE, | 
 |         base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr()), {}); | 
 |   } else if (delegate_) { | 
 |     delegate_->OnPermanentFailure(); | 
 |   } | 
 | } | 
 |  | 
 | void XmppChannel::OnStanza(std::unique_ptr<XmlNode> stanza) { | 
 |   // Handle stanza asynchronously, since XmppChannel::OnStanza() is a callback | 
 |   // from expat XML parser and some stanza could cause the XMPP stream to be | 
 |   // reset and the parser to be re-initialized. We don't want to destroy the | 
 |   // parser while it is performing a callback invocation. | 
 |   task_runner_->PostDelayedTask( | 
 |       FROM_HERE, | 
 |       base::Bind(&XmppChannel::HandleStanza, task_ptr_factory_.GetWeakPtr(), | 
 |                  base::Passed(std::move(stanza))), | 
 |       {}); | 
 | } | 
 |  | 
 | void XmppChannel::HandleStanza(std::unique_ptr<XmlNode> stanza) { | 
 |   VLOG(2) << "XMPP stanza received: " << stanza->ToString(); | 
 |  | 
 |   switch (state_) { | 
 |     case XmppState::kConnected: | 
 |       if (stanza->name() == "stream:features") { | 
 |         auto children = stanza->FindChildren("mechanisms/mechanism", false); | 
 |         for (const auto& child : children) { | 
 |           if (child->text() == "X-OAUTH2") { | 
 |             state_ = XmppState::kAuthenticationStarted; | 
 |             SendMessage(BuildXmppAuthenticateCommand(account_, access_token_)); | 
 |             return; | 
 |           } | 
 |         } | 
 |       } | 
 |       break; | 
 |     case XmppState::kAuthenticationStarted: | 
 |       if (stanza->name() == "success") { | 
 |         state_ = XmppState::kStreamRestartedPostAuthentication; | 
 |         RestartXmppStream(); | 
 |         return; | 
 |       } else if (stanza->name() == "failure") { | 
 |         if (stanza->FindFirstChild("not-authorized", false)) { | 
 |           state_ = XmppState::kAuthenticationFailed; | 
 |           return; | 
 |         } | 
 |       } | 
 |       break; | 
 |     case XmppState::kStreamRestartedPostAuthentication: | 
 |       if (stanza->name() == "stream:features" && | 
 |           stanza->FindFirstChild("bind", false)) { | 
 |         state_ = XmppState::kBindSent; | 
 |         iq_stanza_handler_->SendRequest( | 
 |             "set", "", "", "<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>", | 
 |             base::Bind(&XmppChannel::OnBindCompleted, | 
 |                        task_ptr_factory_.GetWeakPtr()), | 
 |             base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); | 
 |         return; | 
 |       } | 
 |       break; | 
 |     default: | 
 |       if (stanza->name() == "message") { | 
 |         HandleMessageStanza(std::move(stanza)); | 
 |         return; | 
 |       } else if (stanza->name() == "iq") { | 
 |         if (!iq_stanza_handler_->HandleIqStanza(std::move(stanza))) { | 
 |           LOG(ERROR) << "Failed to handle IQ stanza"; | 
 |           CloseStream(); | 
 |         } | 
 |         return; | 
 |       } | 
 |       LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString(); | 
 |       return; | 
 |   } | 
 |   // Something bad happened. Close the stream and start over. | 
 |   LOG(ERROR) << "Error condition occurred handling stanza: " | 
 |              << stanza->ToString() << " in state: " << static_cast<int>(state_); | 
 |   CloseStream(); | 
 | } | 
 |  | 
 | void XmppChannel::CloseStream() { | 
 |   SendMessage("</stream:stream>"); | 
 | } | 
 |  | 
 | void XmppChannel::OnBindCompleted(std::unique_ptr<XmlNode> reply) { | 
 |   if (reply->GetAttributeOrEmpty("type") != "result") { | 
 |     CloseStream(); | 
 |     return; | 
 |   } | 
 |   const XmlNode* jid_node = reply->FindFirstChild("bind/jid", false); | 
 |   if (!jid_node) { | 
 |     LOG(ERROR) << "XMPP Bind response is missing JID"; | 
 |     CloseStream(); | 
 |     return; | 
 |   } | 
 |  | 
 |   jid_ = jid_node->text(); | 
 |   state_ = XmppState::kSessionStarted; | 
 |   iq_stanza_handler_->SendRequest( | 
 |       "set", "", "", "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>", | 
 |       base::Bind(&XmppChannel::OnSessionEstablished, | 
 |                  task_ptr_factory_.GetWeakPtr()), | 
 |       base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); | 
 | } | 
 |  | 
 | void XmppChannel::OnSessionEstablished(std::unique_ptr<XmlNode> reply) { | 
 |   if (reply->GetAttributeOrEmpty("type") != "result") { | 
 |     CloseStream(); | 
 |     return; | 
 |   } | 
 |   state_ = XmppState::kSubscribeStarted; | 
 |   std::string body = | 
 |       "<subscribe xmlns='google:push'>" | 
 |       "<item channel='cloud_devices' from=''/></subscribe>"; | 
 |   iq_stanza_handler_->SendRequest( | 
 |       "set", "", account_, body, | 
 |       base::Bind(&XmppChannel::OnSubscribed, task_ptr_factory_.GetWeakPtr()), | 
 |       base::Bind(&XmppChannel::Restart, task_ptr_factory_.GetWeakPtr())); | 
 | } | 
 |  | 
 | void XmppChannel::OnSubscribed(std::unique_ptr<XmlNode> reply) { | 
 |   if (reply->GetAttributeOrEmpty("type") != "result") { | 
 |     CloseStream(); | 
 |     return; | 
 |   } | 
 |   state_ = XmppState::kSubscribed; | 
 |   if (delegate_) | 
 |     delegate_->OnConnected(GetName()); | 
 | } | 
 |  | 
 | void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) { | 
 |   const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true); | 
 |   if (!node) { | 
 |     LOG(WARNING) << "XMPP message stanza is missing <push:data> element"; | 
 |     return; | 
 |   } | 
 |   std::string data = node->text(); | 
 |   std::string json_data; | 
 |   if (!Base64Decode(data, &json_data)) { | 
 |     LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data; | 
 |     return; | 
 |   } | 
 |  | 
 |   VLOG(2) << "XMPP push notification data: " << json_data; | 
 |   auto json_dict = LoadJsonDict(json_data, nullptr); | 
 |   if (json_dict && delegate_) | 
 |     ParseNotificationJson(*json_dict, delegate_, GetName()); | 
 | } | 
 |  | 
 | void XmppChannel::CreateSslSocket() { | 
 |   CHECK(!stream_); | 
 |   state_ = XmppState::kConnecting; | 
 |   LOG(INFO) << "Starting XMPP connection to: " << xmpp_endpoint_; | 
 |  | 
 |   std::pair<std::string, std::string> host_port = | 
 |       SplitAtFirst(xmpp_endpoint_, ":", true); | 
 |   CHECK(!host_port.first.empty()); | 
 |   CHECK(!host_port.second.empty()); | 
 |   uint32_t port = 0; | 
 |   CHECK(base::StringToUint(host_port.second, &port)) << xmpp_endpoint_; | 
 |  | 
 |   network_->OpenSslSocket(host_port.first, port, | 
 |                           base::Bind(&XmppChannel::OnSslSocketReady, | 
 |                                      task_ptr_factory_.GetWeakPtr())); | 
 | } | 
 |  | 
 | void XmppChannel::OnSslSocketReady(std::unique_ptr<Stream> stream, | 
 |                                    ErrorPtr error) { | 
 |   if (error) { | 
 |     LOG(ERROR) << "TLS handshake failed. Restarting XMPP connection"; | 
 |     backoff_entry_.InformOfRequest(false); | 
 |  | 
 |     LOG(INFO) << "Delaying connection to XMPP server for " | 
 |               << backoff_entry_.GetTimeUntilRelease(); | 
 |     return task_runner_->PostDelayedTask( | 
 |         FROM_HERE, base::Bind(&XmppChannel::CreateSslSocket, | 
 |                               task_ptr_factory_.GetWeakPtr()), | 
 |         backoff_entry_.GetTimeUntilRelease()); | 
 |   } | 
 |   CHECK(XmppState::kConnecting == state_); | 
 |   backoff_entry_.InformOfRequest(true); | 
 |   stream_ = std::move(stream); | 
 |   state_ = XmppState::kConnected; | 
 |   RestartXmppStream(); | 
 |   ScheduleRegularPing(); | 
 | } | 
 |  | 
 | void XmppChannel::SendMessage(const std::string& message) { | 
 |   CHECK(stream_) << "No XMPP socket stream available"; | 
 |   if (write_pending_) { | 
 |     queued_write_data_ += message; | 
 |     return; | 
 |   } | 
 |   write_socket_data_ = queued_write_data_ + message; | 
 |   queued_write_data_.clear(); | 
 |   VLOG(2) << "Sending XMPP message: " << message; | 
 |  | 
 |   write_pending_ = true; | 
 |   stream_->Write( | 
 |       write_socket_data_.data(), write_socket_data_.size(), | 
 |       base::Bind(&XmppChannel::OnMessageSent, task_ptr_factory_.GetWeakPtr())); | 
 | } | 
 |  | 
 | void XmppChannel::OnMessageSent(ErrorPtr error) { | 
 |   write_pending_ = false; | 
 |   if (error) | 
 |     return Restart(); | 
 |   if (queued_write_data_.empty()) { | 
 |     WaitForMessage(); | 
 |   } else { | 
 |     SendMessage(std::string{}); | 
 |   } | 
 | } | 
 |  | 
 | void XmppChannel::WaitForMessage() { | 
 |   if (read_pending_ || !stream_) | 
 |     return; | 
 |  | 
 |   read_pending_ = true; | 
 |   stream_->Read( | 
 |       read_socket_data_.data(), read_socket_data_.size(), | 
 |       base::Bind(&XmppChannel::OnMessageRead, task_ptr_factory_.GetWeakPtr())); | 
 | } | 
 |  | 
 | std::string XmppChannel::GetName() const { | 
 |   return "xmpp"; | 
 | } | 
 |  | 
 | bool XmppChannel::IsConnected() const { | 
 |   return state_ == XmppState::kSubscribed; | 
 | } | 
 |  | 
 | void XmppChannel::AddChannelParameters(base::DictionaryValue* channel_json) { | 
 |   // No extra parameters needed for XMPP. | 
 | } | 
 |  | 
 | void XmppChannel::Restart() { | 
 |   LOG(INFO) << "Restarting XMPP"; | 
 |   Stop(); | 
 |   Start(delegate_); | 
 | } | 
 |  | 
 | void XmppChannel::Start(NotificationDelegate* delegate) { | 
 |   CHECK(state_ == XmppState::kNotStarted); | 
 |   delegate_ = delegate; | 
 |  | 
 |   CreateSslSocket(); | 
 | } | 
 |  | 
 | void XmppChannel::Stop() { | 
 |   if (IsConnected() && delegate_) | 
 |     delegate_->OnDisconnected(); | 
 |  | 
 |   task_ptr_factory_.InvalidateWeakPtrs(); | 
 |   ping_ptr_factory_.InvalidateWeakPtrs(); | 
 |  | 
 |   stream_.reset(); | 
 |   state_ = XmppState::kNotStarted; | 
 | } | 
 |  | 
 | void XmppChannel::RestartXmppStream() { | 
 |   stream_parser_.Reset(); | 
 |   stream_->CancelPendingOperations(); | 
 |   read_pending_ = false; | 
 |   write_pending_ = false; | 
 |   SendMessage(BuildXmppStartStreamCommand()); | 
 | } | 
 |  | 
 | void XmppChannel::SchedulePing(base::TimeDelta interval, | 
 |                                base::TimeDelta timeout) { | 
 |   VLOG(1) << "Next XMPP ping in " << interval << " with timeout " << timeout; | 
 |   ping_ptr_factory_.InvalidateWeakPtrs(); | 
 |   task_runner_->PostDelayedTask( | 
 |       FROM_HERE, base::Bind(&XmppChannel::PingServer, | 
 |                             ping_ptr_factory_.GetWeakPtr(), timeout), | 
 |       interval); | 
 | } | 
 |  | 
 | void XmppChannel::ScheduleRegularPing() { | 
 |   SchedulePing(base::TimeDelta::FromSeconds(kRegularPingIntervalSeconds), | 
 |                base::TimeDelta::FromSeconds(kRegularPingTimeoutSeconds)); | 
 | } | 
 |  | 
 | void XmppChannel::ScheduleFastPing() { | 
 |   SchedulePing(base::TimeDelta::FromSeconds(kAgressivePingIntervalSeconds), | 
 |                base::TimeDelta::FromSeconds(kAgressivePingTimeoutSeconds)); | 
 | } | 
 |  | 
 | void XmppChannel::PingServer(base::TimeDelta timeout) { | 
 |   VLOG(1) << "Sending XMPP ping"; | 
 |   if (!IsConnected()) { | 
 |     LOG(WARNING) << "XMPP channel is not connected"; | 
 |     Restart(); | 
 |     return; | 
 |   } | 
 |  | 
 |   // Send an XMPP Ping request as defined in XEP-0199 extension: | 
 |   // http://xmpp.org/extensions/xep-0199.html | 
 |   iq_stanza_handler_->SendRequestWithCustomTimeout( | 
 |       "get", jid_, account_, "<ping xmlns='urn:xmpp:ping'/>", timeout, | 
 |       base::Bind(&XmppChannel::OnPingResponse, task_ptr_factory_.GetWeakPtr(), | 
 |                  base::Time::Now()), | 
 |       base::Bind(&XmppChannel::OnPingTimeout, task_ptr_factory_.GetWeakPtr(), | 
 |                  base::Time::Now())); | 
 | } | 
 |  | 
 | void XmppChannel::OnPingResponse(base::Time sent_time, | 
 |                                  std::unique_ptr<XmlNode> reply) { | 
 |   VLOG(1) << "XMPP response received after " << (base::Time::Now() - sent_time); | 
 |   // Ping response received from server. Everything seems to be in order. | 
 |   // Reschedule with default intervals. | 
 |   ScheduleRegularPing(); | 
 | } | 
 |  | 
 | void XmppChannel::OnPingTimeout(base::Time sent_time) { | 
 |   LOG(WARNING) << "XMPP channel seems to be disconnected. Ping timed out after " | 
 |                << (base::Time::Now() - sent_time); | 
 |   Restart(); | 
 | } | 
 |  | 
 | void XmppChannel::OnConnectivityChanged() { | 
 |   if (state_ == XmppState::kNotStarted) | 
 |     return; | 
 |  | 
 |   if (state_ == XmppState::kConnecting && | 
 |       backoff_entry_.GetTimeUntilRelease() < | 
 |           base::TimeDelta::FromSeconds( | 
 |               kConnectingTimeoutAfterNetChangeSeconds)) { | 
 |     VLOG(1) << "Next reconnect in " << backoff_entry_.GetTimeUntilRelease(); | 
 |     return; | 
 |   } | 
 |  | 
 |   ScheduleFastPing(); | 
 | } | 
 |  | 
 | }  // namespace weave |