Java Ftp连接池
某项目由于网络关系,传输文件通过ftp方式,即第三方实时把碎片文件放到ftp,我方需要把ftp文件下载下来进行业务处理。每个文件在100KB左右,每日文件数量在300万左右。前同事直接用在网上找的ftp demo,每次上传/下载文件创建新的连接,且单线程处理文件导致处理性能在1个文件每秒左右。当我们发现时,堆积的文件已经快撑爆了ftp服务器。
所以需要对现有代码进行优化。将现有的单线程改为多线程,每处理一个文件就启动一个线程来处理。创建ftp连接池,为每个线程提供单独的ftp连接。当线程下载/上传完成一个文件后,立即将连接还给Ftp连接池,提供连接池复用效率。
优化Ftp服务器配置
如果采用多线程消费ftp上文件,并且采用ftp连接池维护ftp连接,需要确认ftp服务的配置是否支持多个连接。项目采用vsftpd,所以需要查看/etc/vsftpd/vsftpd.conf配置文件。
- pasv_max_port=0
设置在PASV工作方式下,数据连接可以使用的端口范围的上界。默认值为0,表示任意端口。 - pasv_mim_port=0
设置在PASV工作方式下,数据连接可以使用的端口范围的下界。默认值为0,表示任意端口。 - max_clients=0
设置vsftpd允许的最大连接数,默认为0,表示不受限制。若设置为150时,则同时允许有150个连接,超出的将拒绝建立连接。只有在以standalone模式运行时才有效。 - max_per_ip=0
设置每个IP地址允许与FTP服务器同时建立连接的数目。默认为0,不受限制。通常可对此配置进行设置,防止同一个用户建立太多的连接。只有在以standalone模式运行时才有效。
配置完成后重启ftp服务。如果ftp连接失败,可以检查是否是防火墙/安全策略限制了端口放行。
连接池
引入Maven
<!-- https://mvnrepository.com/artifact/commons-net/commons-net -->
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.10.0</version>
</dependency>
Ftp连接配置
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
@Data
public class FtpProperties {
@Value("${ftp.hostname}")
private String host;
@Value("${ftp.username}")
private String username;
@Value("${ftp.password}")
private String password;
@Value("${ftp.port}")
private int port;
@Value("${ftp.timeout.default.seconds:1200}")
private Integer defaultTimeoutSecond = 1200;
@Value("${ftp.timeout.connect.seconds:1800}")
private Integer connectTimeoutSecond = 1800;
@Value("${ftp.timeout.data.seconds:2400}")
private Integer dataTimeoutSecond = 2400;
@Value("${ftp.charSet:UTF-8}")
private String charSet = "UTF-8";
@Value("${ftp.factory.thread.times:2}")
private Integer threadTimes = 2;
}
Ftp连接对象
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import java.io.IOException;
import java.io.OutputStream;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* ftp连接
*/
@Slf4j
public class FtpConnection {
private final FTPClient ftp = new FTPClient();
private final AtomicBoolean isConnected = new AtomicBoolean(false);
/**
* 构造函数
*/
public FtpConnection(FtpProperties ftpProperties) {
ftp.setDefaultTimeout(ftpProperties.getDefaultTimeoutSecond() * 1000);
ftp.setConnectTimeout(ftpProperties.getConnectTimeoutSecond() * 1000);
ftp.setDataTimeout(ftpProperties.getDataTimeoutSecond() * 1000);
ftp.setCharset(Charset.forName(ftpProperties.getCharSet()));
ftp.setControlEncoding(ftpProperties.getCharSet());
//被动模式
ftp.enterLocalPassiveMode();
try {
initConnect(ftpProperties.getHost(), ftpProperties.getPort(),
ftpProperties.getUsername(), ftpProperties.getPassword());
} catch (IOException e) {
isConnected.set(false);
log.error("init ftp client error", e);
}
}
/**
* 初始化连接
*/
private void initConnect(String host, int port, String user, String password) throws IOException {
try {
ftp.connect(host, port);
} catch (UnknownHostException ex) {
throw new IOException("Can't find FTP server '" + host + "'");
}
int reply = ftp.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
disconnect();
throw new IOException("Can't connect to server '" + host + "'");
}
if (!ftp.login(user, password)) {
isConnected.set(false);
disconnect();
throw new IOException("Can't login to server '" + host + "'");
} else {
isConnected.set(true);
}
}
public List<String> fileNames(String path) {
try {
//获取文件列表使用这个接口最好,命令行效率更高
String[] strings = ftp.listNames(path);
return strings == null ? new ArrayList<>() : Stream.of(strings).collect(Collectors.toList());
} catch (IOException e) {
log.error("ftp listNames error.path = {}", path);
return new ArrayList<>();
}
}
public String download(String path, String ftpFileName, String localPath, boolean deleteSuccessFile, boolean deleteErrorFile) {
String fileName = localPath + ftpFileName.substring(path.length());
boolean deleted = false;
try (OutputStream os = Files.newOutputStream(Paths.get(fileName))) {
//保存文件
boolean success = ftp.retrieveFile(ftpFileName, os);
if (success) {
//是否需要删除源文件
if (deleteSuccessFile) {
deleted = delete(ftpFileName);
}
return fileName;
}
} catch (Exception e) {
log.error("ftp download error", e);
if (deleteErrorFile) {
//对于下载失败的文件直接删除
deleted = delete(ftpFileName);
}
} finally {
if (deleteSuccessFile || deleteErrorFile) {
log.debug("ftp download then delete file {}", deleted ? "success" : "failed");
}
}
return null;
}
public boolean delete(String ftpFileName) {
try {
ftp.deleteFile(ftpFileName);
return true;
} catch (IOException e) {
log.error("delete ftp file error", e);
}
return false;
}
/**
* 关闭连接
*/
public void disconnect() {
if (ftp.isConnected()) {
try {
ftp.logout();
ftp.disconnect();
isConnected.set(false);
} catch (IOException e) {
log.error("ftp disconnect error", e);
}
}
}
/**
* 设置工作路径
*/
private boolean setWorkingDirectory(String dir) {
if (!isConnected.get()) {
return false;
}
//如果目录不存在创建目录
try {
if (createDirectory(dir)) {
return ftp.changeWorkingDirectory(dir);
}
} catch (IOException e) {
log.error("set working directory error", e);
}
return false;
}
/**
* 是否连接
*/
public boolean isConnected() {
return isConnected.get();
}
/**
* 创建目录
*/
private boolean createDirectory(String remote) throws IOException {
boolean success = true;
String directory = remote.substring(0, remote.lastIndexOf("/") + 1);
// 如果远程目录不存在,则递归创建远程服务器目录
if (!directory.equalsIgnoreCase("/") && !ftp.changeWorkingDirectory(directory)) {
int start = 0;
int end;
if (directory.startsWith("/")) {
start = 1;
}
end = directory.indexOf("/", start);
do {
String subDirectory = remote.substring(start, end);
if (!ftp.changeWorkingDirectory(subDirectory)) {
if (ftp.makeDirectory(subDirectory)) {
ftp.changeWorkingDirectory(subDirectory);
} else {
log.error("mack directory error :/" + subDirectory);
return false;
}
}
start = end + 1;
end = directory.indexOf("/", start);
// 检查所有目录是否创建完毕
} while (end > start);
}
return success;
}
}
Ftp工厂
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
/**
* ftp连接工厂
*/
@Slf4j
@NoArgsConstructor
public class FtpFactory {
/**
* 存放ftp连接有界队列
*/
private ArrayBlockingQueue<FtpConnection> arrayBlockingQueue;
private FtpProperties ftpProperties;
/**
* 最大连接数在初始化的时候固定,除非删除,否则不能新增连接
*/
private int maxSize;
/**
* 工厂含有的连接数(包括已经取出的)
*/
private int ftpSize = 0;
private final ReentrantLock lock = new ReentrantLock(false);
public FtpFactory(FtpProperties ftpProperties) {
this.ftpProperties = ftpProperties;
//最大数量为处理器线程数*配置的倍数
this.maxSize = Runtime.getRuntime().availableProcessors() * ftpProperties.getThreadTimes();
this.arrayBlockingQueue = new ArrayBlockingQueue<>(maxSize);
//初始化一个客户端,节约资源。后续需要更多再追加。
this.fill(1);
}
/**
* 扩容工厂的连接池
*
* @param size 扩容连接数量至
*/
public void fill(int size) {
if (size <= 0) {
return;
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (int i = 0; i < size; i++) {
if (ftpSize >= size || ftpSize >= maxSize) {
break;
}
//表示如果可能的话,将 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false
FtpConnection connection = new FtpConnection(ftpProperties);
boolean offer = this.safeOffer(connection);
if (!offer) {
break;
} else {
ftpSize++;
}
}
log.info("Fill ftpConnection end, size is {}.", ftpSize);
} finally {
lock.unlock();
}
}
/**
* 将ftp连接放入队列,如果超过队列大小将会销毁连接
*
* @param connection
* @return 是否加入成功
*/
private boolean safeOffer(FtpConnection connection) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean offer = arrayBlockingQueue.offer(connection);
if (!offer) {
log.debug("offer ftpConnection failed");
connection.disconnect();
}
return offer;
} finally {
lock.unlock();
}
}
/**
* 获取连接(阻塞)
*/
public FtpConnection getFtp() {
FtpConnection poll;
try {
//取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 Blocking 有新的对象被加入为止
poll = arrayBlockingQueue.take();
} catch (InterruptedException e) {
log.error("getFtpConnection error", e);
return null;
}
return poll;
}
/**
* 释放连接
*
* @param ftp
*/
public void safeRelease(FtpConnection ftp) {
this.safeOffer(ftp);
}
/**
* 关闭连接
*/
public void close() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (FtpConnection connection : arrayBlockingQueue) {
connection.disconnect();
}
arrayBlockingQueue.clear();
ftpSize = 0;
} finally {
lock.unlock();
}
}
}
工厂使用
使用方式
- 获取连接
FtpConnection connection = ftpFactory.getFtp(); - 归还连接
ftpFactory.safeRelease(connection); - 扩容连接池
ftpFactory.fill(50);
50代表想要50个连接,注意有最大限制。最大限制为FtpFactory中的maxSize,可以通过ftp.factory.thread.times配置为系统线程数的倍数。
代码示例
public void yourJob(FtpFactory ftpFactory, String ftpFilePath) {
try {
//拿到一个连接
FtpConnection connection = ftpFactory.getFtp();
List<String> names = connection.fileNames(ftpFilePath);
//归还连接
ftpFactory.safeRelease(connection);
if (!ObjectUtils.isEmpty(names)) {
ftpFactory.fill(names.size());
CountDownLatch countDownLatch = new CountDownLatch(names.size());
Logger.info("本次文件路径 {}, 总数 {}, 开始处理", ftpFilePath, names.size());
long start = System.currentTimeMillis();
for (String fileName : names) {
ftpThreadPool.execute(
() -> {
//这里传入ftpFactory是为了从工厂拿到连接和归还连接
yourService.downloadAndAnalysis(ftpFactory, ftpFilePath, fileName);
countDownLatch.countDown();
});
}
countDownLatch.await();
double seconds = (System.currentTimeMillis() - start) / 1000.0;
Logger.info("完成文件路径 {}, 总数 {}, 耗时 {} 秒, 速度 {} 条/s",
ftpFilePath, names.size(), seconds, names.size() / seconds);
} else {
Logger.error("ftp文件为空ftpFilePath:{}", ftpFilePath);
}
} catch (Exception e) {
Logger.error("同步图片数据异常", e);
}
}
参考
License:
CC BY 4.0