# Copyright 2016 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=W0613,E1101,attribute-defined-outside-init
from __future__ import division
import os
import subprocess
import signal
import csv
import threading
import time
import getpass
import logging
import xmlrpclib
from datetime import datetime
from wlauto import Instrument, Parameter, Executable
from wlauto.exceptions import InstrumentError, ConfigError
from wlauto.utils.types import list_of_strings
from wlauto.utils.misc import check_output
from wlauto.utils.cros_sdk import CrosSdkSession
from wlauto.utils.misc import which
[docs]class ServoPowerMonitor(Instrument):
name = 'servo_power'
description = """
Collects power traces using the Chromium OS Servo Board.
Servo is a debug board used for Chromium OS test and development. Among other uses, it allows
access to the built in power monitors (if present) of a Chrome OS device. More information on
Servo board can be found in the link bellow:
https://www.chromium.org/chromium-os/servo
In order to use this instrument you need to be a sudoer and you need a chroot environment. More
information on the chroot environment can be found on the link bellow:
https://www.chromium.org/chromium-os/developer-guide
If you wish to run servod on a remote machine you will need to allow it to accept external connections
using the `--host` command line option, like so:
`sudo servod -b some_board -c some_board.xml --host=''`
"""
parameters = [
Parameter('power_domains', kind=list_of_strings, default=[],
description="""The names of power domains to be monitored by the
instrument using servod."""),
Parameter('labels', kind=list_of_strings, default=[],
description="""Meaningful labels for each of the monitored domains."""),
Parameter('chroot_path', kind=str,
description="""Path to chroot direcory on the host."""),
Parameter('sampling_rate', kind=int, default=10,
description="""Samples per second."""),
Parameter('board_name', kind=str, mandatory=True,
description="""The name of the board under test."""),
Parameter('autostart', kind=bool, default=True,
description="""Automatically start `servod`. Set to `False` if you want to
use an already running `servod` instance or a remote servo"""),
Parameter('host', kind=str, default="localhost",
description="""When `autostart` is set to `False` you can specify the host
on which `servod` is running allowing you to remotelly access
as servo board.
if `autostart` is `True` this parameter is ignored and `localhost`
is used instead"""),
Parameter('port', kind=int, default=9999,
description="""When `autostart` is set to false you must provide the port
that `servod` is running on
If `autostart` is `True` this parameter is ignored and the port
output during the startup of `servod` will be used."""),
Parameter('vid', kind=str,
description="""When more than one servo is plugged in, you must provide
a vid/pid pair to identify the servio you wish to use."""),
Parameter('pid', kind=str,
description="""When more than one servo is plugged in, you must provide
a vid/pid pair to identify the servio you wish to use."""),
]
# When trying to initialize servod, it may take some time until the server is up
# Therefore we need to poll to identify when the sever has successfully started
# servod_max_tries specifies the maximum number of times we will check to see if the server has started
# while servod_delay_between_tries is the sleep time between checks.
servod_max_tries = 100
servod_delay_between_tries = 0.1
def validate(self):
# pylint: disable=access-member-before-definition
if self.labels and len(self.power_domains) != len(self.labels):
raise ConfigError('There should be exactly one label per power domain')
if self.autostart:
if self.host != 'localhost': # pylint: disable=access-member-before-definition
self.logger.warning('Ignoring host "%s" since autostart is set to "True"', self.host)
self.host = "localhost"
if (self.vid is None) != (self.pid is None):
raise ConfigError('`vid` and `pid` must both be specified')
def initialize(self, context):
# pylint: disable=access-member-before-definition
self.poller = None
self.data = None
self.stopped = True
if self.device.platform != "chromeos":
raise InstrumentError("servo_power instrument only supports Chrome OS devices.")
if not self.labels:
self.labels = ["PORT_{}".format(channel) for channel, _ in enumerate(self.power_domains)]
self.power_domains = [channel if channel.endswith("_mw") else
"{}_mw".format(channel) for channel in self.power_domains]
self.label_map = {pd: l for pd, l in zip(self.power_domains, self.labels)}
if self.autostart:
self._start_servod()
[docs] def setup(self, context):
# pylint: disable=access-member-before-definition
self.outfile = os.path.join(context.output_directory, 'servo.csv')
self.poller = PowerPoller(self.host, self.port, self.power_domains, self.sampling_rate)
[docs] def start(self, context):
self.poller.start()
self.stopped = False
[docs] def stop(self, context):
self.data = self.poller.stop()
self.poller.join()
self.stopped = True
timestamps = self.data.pop("timestamp")
for channel, data in self.data.iteritems():
label = self.label_map[channel]
data = [float(v) / 1000.0 for v in data]
sample_sum = sum(data)
metric_name = '{}_power'.format(label)
power = sample_sum / len(data)
context.result.add_metric(metric_name, round(power, 3), 'Watts')
metric_name = '{}_energy'.format(label)
energy = sample_sum * (1.0 / self.sampling_rate)
context.result.add_metric(metric_name, round(energy, 3), 'Joules')
with open(self.outfile, 'wb') as f:
c = csv.writer(f)
headings = ['timestamp'] + ['{}_power'.format(label) for label in self.labels]
c.writerow(headings)
for row in zip(timestamps, *self.data.itervalues()):
c.writerow(row)
[docs] def teardown(self, context):
if not self.stopped:
self.stop(context)
if self.autostart:
self.server_session.kill_session()
def _start_servod(self):
in_chroot = False if which('dut-control') is None else True
password = ''
if not in_chroot:
msg = 'Instrument %s requires sudo access on this machine to start `servod`'
self.logger.info(msg, self.name)
self.logger.info('You need to be sudoer to use it.')
password = getpass.getpass()
check = subprocess.call('echo {} | sudo -S ls > /dev/null'.format(password), shell=True)
if check:
raise InstrumentError('Given password was either wrong or you are not a sudoer')
self.server_session = CrosSdkSession(self.chroot_path, password=password)
password = ''
command = 'sudo servod -b {b} -c {b}.xml'
if self.vid and self.pid:
command += " -v " + self.vid
command += " -p " + self.pid
command += '&'
self.server_session.send_command(command.format(b=self.board_name))
for _ in xrange(self.servod_max_tries):
server_lines = self.server_session.get_lines(timeout=1, from_stderr=True,
timeout_only_for_first_line=False)
if server_lines:
if 'Listening on' in server_lines[-1]:
self.port = int(server_lines[-1].split()[-1])
break
time.sleep(self.servod_delay_between_tries)
else:
raise InstrumentError('Failed to start servod in cros_sdk environment')
[docs]class PowerPoller(threading.Thread):
def __init__(self, host, port, channels, sampling_rate):
super(PowerPoller, self).__init__()
self.proxy = xmlrpclib.ServerProxy("http://{}:{}/".format(host, port))
self.proxy.get(channels[1]) # Testing connection
self.channels = channels
self.data = {channel: [] for channel in channels}
self.data['timestamp'] = []
self.period = 1.0 / sampling_rate
self.term_signal = threading.Event()
self.term_signal.set()
self.logger = logging.getLogger(self.__class__.__name__)
[docs] def run(self):
while self.term_signal.is_set():
self.data['timestamp'].append(str(datetime.now()))
for channel in self.channels:
self.data[channel].append(float(self.proxy.get(channel)))
time.sleep(self.period)
[docs] def stop(self):
self.term_signal.clear()
self.join()
return self.data