Newer
Older
# Mantid Repository : https://github.com/mantidproject/mantid
# Copyright © 2017 ISIS Rutherford Appleton Laboratory UKRI,
# NScD Oak Ridge National Laboratory, European Spallation Source
# & Institut Laue - Langevin
# SPDX - License - Identifier: GPL - 3.0 +
# This file is part of the mantidqt package
from __future__ import absolute_import, unicode_literals
import __future__
import ast
try:
import builtins
except ImportError:
import __main__
builtins = __main__.__builtins__
import copy
from lib2to3.pgen2.tokenize import detect_encoding
from qtpy.QtCore import QObject, Signal
from qtpy.QtWidgets import QApplication
from mantidqt.utils import AddedToSysPath
from mantidqt.utils.asynchronous import AsyncTask, BlockingAsyncTaskWithCallback
from mantidqt.utils.qt import import_qt
# Core object to execute the code with optinal progress tracking
CodeExecution = import_qt('..._common', 'mantidqt.widgets.codeeditor.execution', 'CodeExecution')
EMPTY_FILENAME_ID = '<string>'
FILE_ATTR = '__file__'
def _get_imported_from_future(code_str):
"""
Parse the given code and return a list of names that are imported
from __future__.
:param code_str: The code to parse
:return list: List of names that are imported from __future__
"""
future_imports = []
code_str = code_str.encode(detect_encoding(BytesIO(code_str.encode()).readline)[0])
except UnicodeEncodeError: # Script contains unicode symbol. Cannot run detect_encoding as it requires ascii.
code_str = code_str.encode('utf-8')
for node in ast.walk(ast.parse(code_str)):
if isinstance(node, ast.ImportFrom):
if node.module == '__future__':
future_imports.extend([import_alias.name for import_alias in node.names])
return future_imports
def get_future_import_compiler_flags(code_str):
"""
Get the compiler flags that can be passed to `compile` that
correspond to the __future__ imports inside the given code.
:param code_str: The code being executed, containing __future__ imports
:return int: The 'bitwise or' union of compiler flags
"""
flags = 0
for f_import_str in _get_imported_from_future(code_str):
try:
future_import = getattr(__future__, f_import_str)
flags |= future_import.compiler_flag
except AttributeError:
# Just pass and let the ImportError be raised on script execution
pass
return flags
class PythonCodeExecution(QObject):
"""Provides the ability to execute arbitrary
strings of Python code. It supports
reporting progress updates in asynchronous execution
"""
sig_exec_success = Signal(object)
sig_exec_error = Signal(object)
def __init__(self, editor=None):
"""Initialize the object"""
super(PythonCodeExecution, self).__init__()
self._editor = editor
self._globals_ns = None
@property
def globals_ns(self):
return self._globals_ns
if self._task is not None:
self._task.abort()
def execute_async(self, code_str, filename=None, blocking=False):
"""
Execute the given code string on a separate thread. This function
returns as soon as the new thread starts
:param code_str: A string containing code to execute
:param filename: See PythonCodeExecution.execute()
:param blocking: If True the call will block until the task is finished
:returns: The created async task, only returns task if the blocking is False
# Stack is chopped on error to avoid the AsyncTask.run->self.execute calls appearing
# as these are not useful for the user in this context
if not blocking:
task = AsyncTask(self.execute, args=(code_str, filename),
success_cb=self._on_success, error_cb=self._on_error)
task.start()
self._task = BlockingAsyncTaskWithCallback(self.execute, args=(code_str, filename),
success_cb=self._on_success, error_cb=self._on_error,
blocking_cb=QApplication.processEvents)
def execute(self, code_str, filename=None):
"""Execute the given code on the calling thread
within the provided context.
:param code_str: A string containing code to execute
:param filename: An optional identifier specifying the file source of the code. If None then '<string>'
is used
:raises: Any error that the code generates
"""
if not filename:
filename = EMPTY_FILENAME_ID
self.globals_ns[FILE_ATTR] = filename
flags = get_future_import_compiler_flags(code_str)
with AddedToSysPath([os.path.dirname(filename)]):
executor = CodeExecution(self._editor)
executor.execute(code_str, filename, flags, self.globals_ns)
def reset_context(self):
# create new context for execution
self._globals_ns = copy.copy(builtins.globals())
# --------------------- Callbacks -------------------------------
def _on_success(self, task_result):
self.sig_exec_success.emit(task_result)
def _on_error(self, task_error):
self.sig_exec_error.emit(task_error)
# --------------------- Private -------------------------------
def _reset_task(self):