You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/fluid/incubate/fleet/utils/hdfs.py

331 lines
9.3 KiB

# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""HDFS Utils."""
import os
import sys
import subprocess
import multiprocessing
from datetime import datetime
import re
import copy
import errno
import time
import logging
import six
from . import fs
from .fs import FS, LocalFS, FSFileExistsError, FSFileNotExistsError, ExecuteError, FSTimeOut, FSShellCmdAborted
from paddle.fluid import core
import functools
from pathlib import PurePosixPath, Path
import shutil
__all__ = ["HDFSClient"]
def _handle_errors(max_time_out=None):
def decorator(f):
@functools.wraps(f)
def handler(*args, **kwargs):
o = args[0]
time_out = max_time_out
if time_out is None:
time_out = float(o._time_out) / 1000.0
else:
time_out /= 1000.0
inter = float(o._sleep_inter) / 1000.0
start = time.time()
last_print_time = start
while True:
try:
return f(*args, **kwargs)
#important: only ExecuteError need to retry
except ExecuteError as e:
if time.time() - start >= time_out:
raise FSTimeOut("args:{} timeout:{}".format(
args, time.time() - start))
time.sleep(inter)
if time.time() - last_print_time > 30:
print("hadoop operator timeout:args:{} timeout:{}".format(
args, time.time() - start))
last_print_time = time.time()
return handler
return decorator
class HDFSClient(FS):
def __init__(
self,
hadoop_home,
configs,
time_out=5 * 60 * 1000, #ms
sleep_inter=1000): #ms
# Raise exception if JAVA_HOME not exists.
self.pre_commands = []
hadoop_bin = '%s/bin/hadoop' % hadoop_home
self.pre_commands.append(hadoop_bin)
dfs = 'fs'
self.pre_commands.append(dfs)
if configs:
for k, v in six.iteritems(configs):
config_command = '-D%s=%s' % (k, v)
self.pre_commands.append(config_command)
self._time_out = time_out
self._sleep_inter = sleep_inter
self._base_cmd = " ".join(self.pre_commands)
self._bd_err_re = re.compile(
r'\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:')
def _run_cmd(self, cmd, redirect_stderr=False):
exe_cmd = "{} -{}".format(self._base_cmd, cmd)
ret, output = core.shell_execute_cmd(exe_cmd, 0, 0, redirect_stderr)
ret = int(ret)
if ret == 134:
raise FSShellCmdAborted(cmd)
return ret, output.splitlines()
@_handle_errors()
def list_dirs(self, fs_path):
if not self.is_exist(fs_path):
return []
dirs, files = self._ls_dir(fs_path)
return dirs
@_handle_errors()
def ls_dir(self, fs_path):
"""
list directory under fs_path, and only give the pure name, not include the fs_path
"""
if not self.is_exist(fs_path):
return [], []
return self._ls_dir(fs_path)
def _ls_dir(self, fs_path):
cmd = "ls {}".format(fs_path)
ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
dirs = []
files = []
for line in lines:
arr = line.split()
if len(arr) != 8:
continue
p = PurePosixPath(arr[7])
if arr[0][0] == 'd':
dirs.append(p.name)
else:
files.append(p.name)
return dirs, files
def _test_match(self, lines):
for l in lines:
m = self._bd_err_re.match(l)
if m != None:
return m
return None
@_handle_errors()
def is_dir(self, fs_path):
if not self.is_exist(fs_path):
return False
return self._is_dir(fs_path)
def _is_dir(self, fs_path):
cmd = "test -d {}".format(fs_path, redirect_stderr=True)
ret, lines = self._run_cmd(cmd)
if ret:
# other error
if self._test_match(lines):
raise ExecuteError(cmd)
return False
return True
def is_file(self, fs_path):
if not self.is_exist(fs_path):
return False
return not self._is_dir(fs_path)
@_handle_errors()
def is_exist(self, fs_path):
cmd = "ls {} ".format(fs_path)
ret, out = self._run_cmd(cmd, redirect_stderr=True)
if ret != 0:
for l in out:
if "No such file or directory" in l:
return False
raise ExecuteError(cmd)
return True
# can't retry
def upload(self, local_path, fs_path):
if self.is_exist(fs_path):
raise FSFileExistsError("{} exists".format(fs_path))
local = LocalFS()
if not local.is_exist(local_path):
raise FSFileNotExistsError("{} not exists".format(local_path))
return self._try_upload(local_path, fs_path)
@_handle_errors()
def _try_upload(self, local_path, fs_path):
cmd = "put {} {}".format(local_path, fs_path)
ret = 0
try:
ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
except Exception as e:
self.delete(fs_path)
raise e
# can't retry
def download(self, fs_path, local_path):
if self.is_exist(local_path):
raise FSFileExistsError("{} exists".format(local_path))
if not self.is_exist(fs_path):
raise FSFileNotExistsError("{} not exits".format(fs_path))
return self._try_download(fs_path, local_path)
@_handle_errors()
def _try_download(self, fs_path, local_path):
cmd = "get {} {}".format(fs_path, local_path)
ret = 0
try:
ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
except Exception as e:
local_fs = LocalFS()
local_fs.delete(local_path)
raise e
@_handle_errors()
def mkdirs(self, fs_path):
if self.is_exist(fs_path):
return
out_hdfs = False
cmd = "mkdir {} ".format(fs_path)
ret, out = self._run_cmd(cmd, redirect_stderr=True)
if ret != 0:
for l in out:
if "No such file or directory" in l:
out_hdfs = True
break
if not out_hdfs:
raise ExecuteError(cmd)
if out_hdfs and not self.is_exist(fs_path):
cmd = "mkdir -p {}".format(fs_path)
ret, lines = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=True):
if overwrite and self.is_exist(fs_dst_path):
self.delete(fs_dst_path)
if test_exists:
if not self.is_exist(fs_src_path):
raise FSFileNotExistsError("{} is not exists".format(
fs_src_path))
if self.is_exist(fs_dst_path):
raise FSFileExistsError("{} exists already".format(
fs_src_path, fs_dst_path, fs_dst_path))
return self._try_mv(fs_src_path, fs_dst_path)
@_handle_errors()
def _try_mv(self, fs_src_path, fs_dst_path):
cmd = "mv {} {}".format(fs_src_path, fs_dst_path)
ret = 0
try:
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
except Exception as e:
if not self.is_exist(fs_src_path) and \
self.is_exist(fs_dst_path):
return
raise e
def _rmr(self, fs_path):
cmd = "rmr {}".format(fs_path)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
def _rm(self, fs_path):
cmd = "rm {}".format(fs_path)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError(cmd)
@_handle_errors()
def delete(self, fs_path):
if not self.is_exist(fs_path):
return
is_dir = self._is_dir(fs_path)
if is_dir:
return self._rmr(fs_path)
return self._rm(fs_path)
def touch(self, fs_path, exist_ok=True):
if self.is_exist(fs_path):
if exist_ok:
return
raise FSFileExistsError
return self._touchz(fs_path)
@_handle_errors()
def _touchz(self, fs_path):
cmd = "touchz {}".format(fs_path)
ret, _ = self._run_cmd(cmd)
if ret != 0:
raise ExecuteError
def need_upload_download(self):
return True