!1069 Use a resident process to write summary files and SummaryRecord as context manager

Merge pull request !1069 from 李鸿章/context_manager
pull/1069/MERGE
mindspore-ci-bot 5 years ago committed by Gitee
commit 6b68671805

@ -14,91 +14,74 @@
# ============================================================================
"""Writes events to disk in a logdir."""
import os
import time
import stat
from mindspore import log as logger
from collections import deque
from multiprocessing import Pool, Process, Queue, cpu_count
from ..._c_expression import EventWriter_
from ._summary_adapter import package_init_event
from ._summary_adapter import package_summary_event
def _pack(result, step):
summary_event = package_summary_event(result, step)
return summary_event.SerializeToString()
class _WrapEventWriter(EventWriter_):
class EventWriter(Process):
"""
Wrap the c++ EventWriter object.
Creates a `EventWriter` and write event to file.
Args:
full_file_name (str): Include directory and file name.
filepath (str): Summary event file path and file name.
flush_interval (int): The flush seconds to flush the pending events to disk. Default: 120.
"""
def __init__(self, full_file_name):
if full_file_name is not None:
EventWriter_.__init__(self, full_file_name)
class EventRecord:
def __init__(self, filepath: str, flush_interval: int) -> None:
super().__init__()
with open(filepath, 'w'):
os.chmod(filepath, stat.S_IWUSR | stat.S_IRUSR)
self._writer = EventWriter_(filepath)
self._queue = Queue(cpu_count() * 2)
self.start()
def run(self):
with Pool() as pool:
deq = deque()
while True:
while deq and deq[0].ready():
self._writer.Write(deq.popleft().get())
if not self._queue.empty():
action, data = self._queue.get()
if action == 'WRITE':
if not isinstance(data, (str, bytes)):
deq.append(pool.apply_async(_pack, data))
else:
self._writer.Write(data)
elif action == 'FLUSH':
self._writer.Flush()
elif action == 'END':
break
for res in deq:
self._writer.Write(res.get())
self._writer.Shut()
def write(self, data) -> None:
"""
Creates a `EventFileWriter` and write event to file.
Write the event to file.
Args:
full_file_name (str): Summary event file path and file name.
flush_time (int): The flush seconds to flush the pending events to disk. Default: 120.
data (Optional[str, Tuple[list, int]]): The data to write.
"""
def __init__(self, full_file_name: str, flush_time: int = 120):
self.full_file_name = full_file_name
# The first event will be flushed immediately.
self.flush_time = flush_time
self.next_flush_time = 0
# create event write object
self.event_writer = self._create_event_file()
self._init_event_file()
# count the events
self.event_count = 0
def _create_event_file(self):
"""Create the event write file."""
with open(self.full_file_name, 'w'):
os.chmod(self.full_file_name, stat.S_IWUSR | stat.S_IRUSR)
# create c++ event write object
event_writer = _WrapEventWriter(self.full_file_name)
return event_writer
def _init_event_file(self):
"""Send the init event to file."""
self.event_writer.Write((package_init_event()).SerializeToString())
self.flush()
return True
def write_event_to_file(self, event_str):
"""Write the event to file."""
self.event_writer.Write(event_str)
def get_data_count(self):
"""Return the event count."""
return self.event_count
def flush_cycle(self):
"""Flush file by timer."""
self.event_count = self.event_count + 1
# Flush the event writer every so often.
now = int(time.time())
if now > self.next_flush_time:
self.flush()
# update the flush time
self.next_flush_time = now + self.flush_time
def count_event(self):
"""Count event."""
logger.debug("Write the event count is %r", self.event_count)
self.event_count = self.event_count + 1
return self.event_count
self._queue.put(('WRITE', data))
def flush(self):
"""Flush the event file to disk."""
self.event_writer.Flush()
"""Flush the writer."""
self._queue.put(('FLUSH', None))
def close(self):
"""Flush the event file to disk and close the file."""
self.flush()
self.event_writer.Shut()
def close(self) -> None:
"""Close the writer."""
self._queue.put(('END', None))
self.join()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -53,14 +53,13 @@ def me_train_tensor(net, input_np, label_np, epoch_size=2):
_network = wrap.WithLossCell(net, loss)
_train_net = MsWrapper(wrap.TrainOneStepCell(_network, opt))
_train_net.set_train()
summary_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_GRAPH", network=_train_net)
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_GRAPH", network=_train_net) as summary_writer:
for epoch in range(0, epoch_size):
print(f"epoch %d" % (epoch))
output = _train_net(Tensor(input_np), Tensor(label_np))
summary_writer.record(i)
print("********output***********")
print(output.asnumpy())
summary_writer.close()
def me_infer_tensor(net, input_np):

@ -91,14 +91,13 @@ def train_summary_record_scalar_for_1(test_writer, steps, fwd_x, fwd_y):
def me_scalar_summary(steps, tag=None, value=None):
test_writer = SummaryRecord(SUMMARY_DIR_ME_TEMP)
with SummaryRecord(SUMMARY_DIR_ME_TEMP) as test_writer:
x = Tensor(np.array([1.1]).astype(np.float32))
y = Tensor(np.array([1.2]).astype(np.float32))
out_me_dict = train_summary_record_scalar_for_1(test_writer, steps, x, y)
test_writer.close()
return out_me_dict

@ -106,7 +106,7 @@ def test_graph_summary_sample():
optim = Momentum(net.trainable_params(), 0.1, 0.9)
context.set_context(mode=context.GRAPH_MODE)
model = Model(net, loss_fn=loss, optimizer=optim, metrics=None)
test_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_GRAPH", network=model._train_network)
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_GRAPH", network=model._train_network) as test_writer:
model.train(2, dataset)
# step 2: create the Event
for i in range(1, 5):
@ -115,7 +115,6 @@ def test_graph_summary_sample():
# step 3: send the event to mq
# step 4: accept the event and write the file
test_writer.close()
log.debug("finished test_graph_summary_sample")
@ -127,7 +126,7 @@ def test_graph_summary_callback():
optim = Momentum(net.trainable_params(), 0.1, 0.9)
context.set_context(mode=context.GRAPH_MODE)
model = Model(net, loss_fn=loss, optimizer=optim, metrics=None)
test_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_GRAPH", network=model._train_network)
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_GRAPH", network=model._train_network) as test_writer:
summary_cb = SummaryStep(test_writer, 1)
model.train(2, dataset, callbacks=summary_cb)
@ -139,6 +138,6 @@ def test_graph_summary_callback2():
optim = Momentum(net.trainable_params(), 0.1, 0.9)
context.set_context(mode=context.GRAPH_MODE)
model = Model(net, loss_fn=loss, optimizer=optim, metrics=None)
test_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_GRAPH", network=net)
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_GRAPH", network=net) as test_writer:
summary_cb = SummaryStep(test_writer, 1)
model.train(2, dataset, callbacks=summary_cb)

@ -52,12 +52,11 @@ def _wrap_test_data(input_data: Tensor):
def test_histogram_summary():
"""Test histogram summary."""
with tempfile.TemporaryDirectory() as tmp_dir:
test_writer = SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM")
with SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM") as test_writer:
test_data = _wrap_test_data(Tensor([[1, 2, 3], [4, 5, 6]]))
_cache_summary_tensor_data(test_data)
test_writer.record(step=1)
test_writer.close()
file_name = os.path.join(tmp_dir, test_writer.event_file_name)
reader = SummaryReader(file_name)
@ -68,7 +67,7 @@ def test_histogram_summary():
def test_histogram_multi_summary():
"""Test histogram multiple step."""
with tempfile.TemporaryDirectory() as tmp_dir:
test_writer = SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM")
with SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM") as test_writer:
rng = np.random.RandomState(10)
size = 50
@ -81,8 +80,6 @@ def test_histogram_multi_summary():
_cache_summary_tensor_data(test_data)
test_writer.record(step=i)
test_writer.close()
file_name = os.path.join(tmp_dir, test_writer.event_file_name)
reader = SummaryReader(file_name)
for _ in range(num_step):
@ -93,12 +90,11 @@ def test_histogram_multi_summary():
def test_histogram_summary_scalar_tensor():
"""Test histogram summary, input is a scalar tensor."""
with tempfile.TemporaryDirectory() as tmp_dir:
test_writer = SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM")
with SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM") as test_writer:
test_data = _wrap_test_data(Tensor(1))
_cache_summary_tensor_data(test_data)
test_writer.record(step=1)
test_writer.close()
file_name = os.path.join(tmp_dir, test_writer.event_file_name)
reader = SummaryReader(file_name)
@ -109,12 +105,11 @@ def test_histogram_summary_scalar_tensor():
def test_histogram_summary_empty_tensor():
"""Test histogram summary, input is an empty tensor."""
with tempfile.TemporaryDirectory() as tmp_dir:
test_writer = SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM")
with SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM") as test_writer:
test_data = _wrap_test_data(Tensor([]))
_cache_summary_tensor_data(test_data)
test_writer.record(step=1)
test_writer.close()
file_name = os.path.join(tmp_dir, test_writer.event_file_name)
reader = SummaryReader(file_name)
@ -125,7 +120,7 @@ def test_histogram_summary_empty_tensor():
def test_histogram_summary_same_value():
"""Test histogram summary, input is an ones tensor."""
with tempfile.TemporaryDirectory() as tmp_dir:
test_writer = SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM")
with SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM") as test_writer:
dim1 = 100
dim2 = 100
@ -133,7 +128,6 @@ def test_histogram_summary_same_value():
test_data = _wrap_test_data(Tensor(np.ones([dim1, dim2])))
_cache_summary_tensor_data(test_data)
test_writer.record(step=1)
test_writer.close()
file_name = os.path.join(tmp_dir, test_writer.event_file_name)
reader = SummaryReader(file_name)
@ -146,7 +140,7 @@ def test_histogram_summary_same_value():
def test_histogram_summary_high_dims():
"""Test histogram summary, input is a 4-dimension tensor."""
with tempfile.TemporaryDirectory() as tmp_dir:
test_writer = SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM")
with SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM") as test_writer:
dim = 10
rng = np.random.RandomState(0)
@ -154,7 +148,6 @@ def test_histogram_summary_high_dims():
test_data = _wrap_test_data(Tensor(tensor_data))
_cache_summary_tensor_data(test_data)
test_writer.record(step=1)
test_writer.close()
file_name = os.path.join(tmp_dir, test_writer.event_file_name)
reader = SummaryReader(file_name)
@ -167,7 +160,7 @@ def test_histogram_summary_high_dims():
def test_histogram_summary_nan_inf():
"""Test histogram summary, input tensor has nan."""
with tempfile.TemporaryDirectory() as tmp_dir:
test_writer = SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM")
with SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM") as test_writer:
dim1 = 100
dim2 = 100
@ -180,7 +173,6 @@ def test_histogram_summary_nan_inf():
_cache_summary_tensor_data(test_data)
test_writer.record(step=1)
test_writer.close()
file_name = os.path.join(tmp_dir, test_writer.event_file_name)
reader = SummaryReader(file_name)
@ -193,12 +185,11 @@ def test_histogram_summary_nan_inf():
def test_histogram_summary_all_nan_inf():
"""Test histogram summary, input tensor has no valid number."""
with tempfile.TemporaryDirectory() as tmp_dir:
test_writer = SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM")
with SummaryRecord(tmp_dir, file_suffix="_MS_HISTOGRAM") as test_writer:
test_data = _wrap_test_data(Tensor(np.array([np.nan, np.nan, np.nan, np.inf, -np.inf])))
_cache_summary_tensor_data(test_data)
test_writer.record(step=1)
test_writer.close()
file_name = os.path.join(tmp_dir, test_writer.event_file_name)
reader = SummaryReader(file_name)

@ -74,7 +74,7 @@ def test_image_summary_sample():
""" test_image_summary_sample """
log.debug("begin test_image_summary_sample")
# step 0: create the thread
test_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_IMAGE")
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_IMAGE") as test_writer:
# step 1: create the test data for summary
@ -88,8 +88,6 @@ def test_image_summary_sample():
# step 3: send the event to mq
# step 4: accept the event and write the file
test_writer.close()
log.debug("finished test_image_summary_sample")
@ -174,7 +172,7 @@ def test_image_summary_train():
log.debug("begin test_image_summary_sample")
# step 0: create the thread
test_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_IMAGE")
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_IMAGE") as test_writer:
# step 1: create the test data for summary
@ -188,8 +186,6 @@ def test_image_summary_train():
# step 3: send the event to mq
# step 4: accept the event and write the file
test_writer.close()
log.debug("finished test_image_summary_sample")
@ -209,18 +205,12 @@ def test_image_summary_data():
log.debug("begin test_image_summary_sample")
# step 0: create the thread
test_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_IMAGE")
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_IMAGE") as test_writer:
# step 1: create the test data for summary
# step 2: create the Event
_cache_summary_tensor_data(test_data_list)
test_writer.record(1)
test_writer.flush()
# step 3: send the event to mq
# step 4: accept the event and write the file
test_writer.close()
log.debug("finished test_image_summary_sample")

@ -65,7 +65,7 @@ def test_scalar_summary_sample():
""" test_scalar_summary_sample """
log.debug("begin test_scalar_summary_sample")
# step 0: create the thread
test_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_SCALAR")
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_SCALAR") as test_writer:
# step 1: create the test data for summary
@ -78,7 +78,6 @@ def test_scalar_summary_sample():
# step 3: send the event to mq
# step 4: accept the event and write the file
test_writer.close()
log.debug("finished test_scalar_summary_sample")
@ -110,7 +109,7 @@ def test_scalar_summary_sample_with_shape_1():
""" test_scalar_summary_sample_with_shape_1 """
log.debug("begin test_scalar_summary_sample_with_shape_1")
# step 0: create the thread
test_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_SCALAR")
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_SCALAR") as test_writer:
# step 1: create the test data for summary
@ -123,7 +122,6 @@ def test_scalar_summary_sample_with_shape_1():
# step 3: send the event to mq
# step 4: accept the event and write the file
test_writer.close()
log.debug("finished test_scalar_summary_sample")
@ -152,7 +150,7 @@ def test_scalar_summary_with_ge():
log.debug("begin test_scalar_summary_with_ge")
# step 0: create the thread
test_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_SCALAR")
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_SCALAR") as test_writer:
# step 1: create the network for summary
x = Tensor(np.array([1.1]).astype(np.float32))
@ -168,8 +166,6 @@ def test_scalar_summary_with_ge():
net(x, y)
test_writer.record(i)
# step 3: close the writer
test_writer.close()
log.debug("finished test_scalar_summary_with_ge")
@ -180,7 +176,7 @@ def test_scalar_summary_with_ge_2():
log.debug("begin test_scalar_summary_with_ge_2")
# step 0: create the thread
test_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_SCALAR")
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_SCALAR") as test_writer:
# step 1: create the network for summary
x = Tensor(np.array([1.1]).astype(np.float32))
@ -196,14 +192,12 @@ def test_scalar_summary_with_ge_2():
net(x, y)
test_writer.record(i)
# step 3: close the writer
test_writer.close()
log.debug("finished test_scalar_summary_with_ge_2")
def test_validate():
sr = SummaryRecord(SUMMARY_DIR)
with SummaryRecord(SUMMARY_DIR) as sr:
with pytest.raises(ValueError):
SummaryStep(sr, 0)
@ -228,7 +222,6 @@ def test_validate():
sr.record("str")
with pytest.raises(ValueError):
sr.record(sr)
sr.close()
SummaryStep(sr, 1)
with pytest.raises(ValueError):

@ -126,7 +126,7 @@ class HistogramSummaryNet(nn.Cell):
def run_case(net):
""" run_case """
# step 0: create the thread
test_writer = SummaryRecord(SUMMARY_DIR)
with SummaryRecord(SUMMARY_DIR) as test_writer:
# step 1: create the network for summary
x = Tensor(np.array([1.1]).astype(np.float32))
@ -141,8 +141,6 @@ def run_case(net):
net(x, y)
test_writer.record(i)
# step 3: close the writer
test_writer.close()
# Test 1: use the repeat tag

@ -80,7 +80,7 @@ def test_tensor_summary_sample():
""" test_tensor_summary_sample """
log.debug("begin test_tensor_summary_sample")
# step 0: create the thread
test_writer = SummaryRecord(SUMMARY_DIR, file_suffix="_MS_TENSOR")
with SummaryRecord(SUMMARY_DIR, file_suffix="_MS_TENSOR") as test_writer:
# step 1: create the Event
for i in range(1, 100):
@ -90,7 +90,6 @@ def test_tensor_summary_sample():
test_writer.record(i)
# step 2: accept the event and write the file
test_writer.close()
log.debug("finished test_tensor_summary_sample")
@ -131,7 +130,7 @@ def test_tensor_summary_with_ge():
log.debug("begin test_tensor_summary_with_ge")
# step 0: create the thread
test_writer = SummaryRecord(SUMMARY_DIR)
with SummaryRecord(SUMMARY_DIR) as test_writer:
# step 1: create the network for summary
x = Tensor(np.array([1.1]).astype(np.float32))
@ -147,7 +146,4 @@ def test_tensor_summary_with_ge():
net(x, y)
test_writer.record(i)
# step 3: close the writer
test_writer.close()
log.debug("finished test_tensor_summary_with_ge")

Loading…
Cancel
Save