-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathipythonmagic.py
More file actions
99 lines (82 loc) · 3.06 KB
/
ipythonmagic.py
File metadata and controls
99 lines (82 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from IPython.core import magic_arguments
from IPython.core.magic import Magics, magics_class, cell_magic, line_cell_magic, line_magic
from .pyamzi import Engine
from .utils import find_files
name_argument = magic_arguments.argument('-n', '--name',
help='Specify a name for logic server engine')
@magics_class
class AmziMagics(Magics):
def __init__(self, shell):
super(AmziMagics, self).__init__(shell)
self.engines = {}
def get_engine(self, args):
if isinstance(args, dict):
name = args.get("name", None)
else:
name = getattr(args, "name", None)
if name is None:
name = "default"
if name not in self.engines:
self.engines[name] = eng = Engine(name, load_init=True)
return self.engines[name]
@magic_arguments.magic_arguments()
@name_argument
@cell_magic
def consult(self, line, cell):
args = magic_arguments.parse_argstring(self.consult, line)
eng = self.get_engine(args)
eng.consult_str(cell)
@magic_arguments.magic_arguments()
@name_argument
@cell_magic
def reconsult(self, line, cell):
args = magic_arguments.parse_argstring(self.reconsult, line)
eng = self.get_engine(args)
eng.reconsult_str(cell)
def _line_cell_help(self, line, cell):
opts, stmt = self.parse_options(line,'n:tcp:qo',
posix=False, strict=False)
eng = self.get_engine(opts)
code = cell if cell is not None else stmt
return eng, code
@line_magic
def redo(self, line=''):
opts, stmt = self.parse_options(line,'n:tcp:qo',
posix=False, strict=False)
eng = self.get_engine(opts)
return eng.redo()
@line_cell_magic
def call_term(self, line='', cell=None):
eng, code = self._line_cell_help(line, cell)
res, term = eng.call_str(code)
return res, term
@line_cell_magic
def exec_term(self, line='', cell=None):
eng, code = self._line_cell_help(line, cell)
res, term = eng.exec_str(code)
return res, term
@line_cell_magic
def query_one(self, line='', cell=None):
eng, code = self._line_cell_help(line, cell)
res = eng.query_one(code)
return res
@line_cell_magic
def query_all(self, line='', cell=None):
eng, code = self._line_cell_help(line, cell)
res = eng.query_all(code)
return list(res)
@line_cell_magic
def find_all(self, line='', cell=None):
eng, code = self._line_cell_help(line, cell)
res = eng.find_all(code)
return res
@line_magic
def engine(self, line):
line = line.strip()
name = None if line == "" else line
return self.get_engine({"name":name})
def __del__(self):
for eng in self.engines.values():
eng.close()
def load_ipython_extension(ip):
ip.register_magics(AmziMagics)