Source code for wlauto.utils.cros_sdk

#    Copyright 2016 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import sys
import time
import os
import logging

from Queue import Queue, Empty
from threading import Thread
from subprocess import Popen, PIPE
from wlauto.utils.misc import which
from wlauto.exceptions import HostError


[docs]class OutputPollingThread(Thread): def __init__(self, out, queue, name): super(OutputPollingThread, self).__init__() self.out = out self.queue = queue self.stop_signal = False self.name = name
[docs] def run(self): for line in iter(self.out.readline, ''): if self.stop_signal: break self.queue.put(line)
[docs] def set_stop(self): self.stop_signal = True
[docs]class CrosSdkSession(object): def __init__(self, cros_path, password=''): self.logger = logging.getLogger(self.__class__.__name__) self.in_chroot = True if which('dut-control') else False ON_POSIX = 'posix' in sys.builtin_module_names if self.in_chroot: self.cros_sdk_session = Popen(['/bin/sh'], bufsize=1, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cros_path, close_fds=ON_POSIX, shell=True) else: cros_sdk_bin_path = which('cros_sdk') potential_path = os.path.join("cros_path", "chromium/tools/depot_tools/cros_sdk") if not cros_sdk_bin_path and os.path.isfile(potential_path): cros_sdk_bin_path = potential_path if not cros_sdk_bin_path: raise HostError("Failed to locate 'cros_sdk' make sure it is in your PATH") self.cros_sdk_session = Popen(['sudo -Sk {}'.format(cros_sdk_bin_path)], bufsize=1, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cros_path, close_fds=ON_POSIX, shell=True) self.cros_sdk_session.stdin.write(password) self.cros_sdk_session.stdin.write('\n') self.stdout_queue = Queue() self.stdout_thread = OutputPollingThread(self.cros_sdk_session.stdout, self.stdout_queue, 'stdout') self.stdout_thread.daemon = True self.stdout_thread.start() self.stderr_queue = Queue() self.stderr_thread = OutputPollingThread(self.cros_sdk_session.stderr, self.stderr_queue, 'stderr') self.stderr_thread.daemon = True self.stderr_thread.start()
[docs] def kill_session(self): self.stdout_thread.set_stop() self.stderr_thread.set_stop() self.send_command('echo TERMINATE >&1') # send something into stdout to unblock it and close it properly self.send_command('echo TERMINATE 1>&2') # ditto for stderr self.stdout_thread.join() self.stderr_thread.join() self.cros_sdk_session.kill()
[docs] def send_command(self, cmd, flush=True): if not cmd.endswith('\n'): cmd = cmd + '\n' self.logger.debug(cmd.strip()) self.cros_sdk_session.stdin.write(cmd) if flush: self.cros_sdk_session.stdin.flush()
[docs] def read_line(self, timeout=0): return _read_line_from_queue(self.stdout_queue, timeout=timeout, logger=self.logger)
[docs] def read_stderr_line(self, timeout=0): return _read_line_from_queue(self.stderr_queue, timeout=timeout, logger=self.logger)
[docs] def get_lines(self, timeout=0, timeout_only_for_first_line=True, from_stderr=False): lines = [] line = True while line is not None: if from_stderr: line = self.read_stderr_line(timeout) else: line = self.read_line(timeout) if line: lines.append(line) if timeout and timeout_only_for_first_line: timeout = 0 # after a line has been read, no further delay is required return lines
def _read_line_from_queue(queue, timeout=0, logger=None): try: line = queue.get_nowait() except Empty: line = None if line is None and timeout: sleep_time = timeout time.sleep(sleep_time) try: line = queue.get_nowait() except Empty: line = None if line is not None: line = line.strip('\n') if logger and line: logger.debug(line) return line