You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/distributed/fs_wrapper.py

224 lines
5.7 KiB

# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import sys
import abc
import os
from pathlib import PurePosixPath
import shutil
class FS(object):
@abc.abstractmethod
def list_dirs(self, fs_path):
pass
@abc.abstractmethod
def ls_dir(self, fs_path):
pass
@abc.abstractmethod
def stat(self, fs_path):
pass
@abc.abstractmethod
def upload(self, local_path, fs_path):
pass
@abc.abstractmethod
def download(self, fs_path, local_path):
pass
@abc.abstractmethod
def mkdir(self, fs_path):
pass
@abc.abstractmethod
def mv(self, fs_src_path, fs_dst_path):
pass
@abc.abstractmethod
def rmr(self, fs_path):
pass
@abc.abstractmethod
def rm(self, fs_path):
pass
@abc.abstractmethod
def delete(self, fs_path):
pass
@abc.abstractmethod
def need_upload_download(self):
pass
class LocalFS(FS):
def list_dirs(self, fs_path):
if not self.stat(fs_path):
return []
return [
f for f in os.listdir(fs_path) if os.path.isdir(fs_path + "/" + f)
]
def ls_dir(self, fs_path):
return [f for f in os.listdir(fs_path)]
def stat(self, fs_path):
return os.path.exists(fs_path)
def mkdir(self, fs_path):
assert not os.path.isfile(fs_path), "{} is already a file".format(
fs_path)
os.system("mkdir -p {}".format(fs_path))
def mv(self, fs_src_path, fs_dst_path):
os.rename(fs_src_path, fs_dst_path)
def rmr(self, fs_path):
shutil.rmtree(fs_path)
def rm(self, fs_path):
os.remove(fs_path)
def delete(self, fs_path):
if not self.stat(fs_path):
return
if os.path.isfile(fs_path):
return self.rm(fs_path)
return self.rmr(fs_path)
def need_upload_download(self):
return False
class BDFS(FS):
def __init__(self,
hdfs_name,
hdfs_ugi,
time_out=20 * 60 * 1000,
sleep_inter=1000):
self._base_cmd = "hadoop fs -Dfs.default.name=\"{}\" -Dhadoop.job.ugi=\"{}\"".format(
hdfs_name, hdfs_ugi)
self._time_out = time_out
self._sleep_inter = sleep_inter
def _run_cmd(self, cmd):
ret = fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
if len(ret) <= 0:
return []
lines = ret.splitlines()
return lines
def list_dirs(self, fs_path):
if not self.stat(fs_path):
return []
dirs, _ = self.ls_dir(fs_path)
return dirs
def ls_dir(self, fs_path):
"""
list directory under fs_path, and only give the pure name, not include the fs_path
"""
cmd = "{} -ls {}".format(self._base_cmd, fs_path)
lines = self._run_cmd(cmd)
dirs = []
files = []
for line in lines:
arr = line.split()
if len(arr) != 8:
continue
if fs_path not in arr[7]:
continue
p = PurePosixPath(arr[7])
if arr[0][0] == 'd':
dirs.append(p.name)
else:
files.append(p.name)
return dirs, files
def is_dir(self, fs_path):
cmd = "{} -test -d {} ; echo $?".format(self._base_cmd, fs_path)
test = self._run_cmd(cmd)
if test[0].strip() == "0":
return True
return False
def stat(self, fs_path):
cmd = "{} -test -e {} ; echo $?".format(self._base_cmd, fs_path)
test = self._run_cmd(cmd)
if test[0].strip() == "0":
return True
return False
def upload(self, local_path, fs_path):
cmd = "{} -put {} {}".format(self._base_cmd, local_path, fs_path)
fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
def download(self, fs_path, local_path):
cmd = "{} -get {} {}/".format(self._base_cmd, fs_path, local_path)
fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
def mkdir(self, fs_path):
if not self.stat(fs_path):
cmd = "{} -mkdir {}".format(self._base_cmd, fs_path)
fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
def mv(self, fs_src_path, fs_dst_path):
cmd = "{} -mv {} {}".format(self._base_cmd, fs_src_path, fs_dst_path)
fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
def rmr(self, fs_path):
if not self.stat(fs_path):
return
cmd = "{} -rmr {}".format(self._base_cmd, fs_path)
return fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
def rm(self, fs_path):
if not self.stat(fs_path):
return
cmd = "{} -rm {}".format(self._base_cmd, fs_path)
return fluid.core.run_cmd(cmd, self._time_out, self._sleep_inter)
def delete(self, fs_path):
if not self.stat(fs_path):
return
is_dir = self.is_dir(fs_path)
if is_dir:
return self.rmr(fs_path)
return self.rm(fs_path)
def need_upload_download(self):
return True