Transfering a file between HDFS and SFTP without copying it to local disk。
org.apache.commons.compress.utils.IOUtils。
大數據分析一個很重要的基礎就是你需要各種不同的資料輔助做計算。
而這些資料往往來自於很多不同的系統,也因此在很多時候資料工程師除了撰寫處理的程式外,也必須兼具外部資料引入的技術。
系統之間的介接有很多種,SFTP 算是很常見的方式,取得 SFTP 的檔案後將其儲存至 Server 的 local 端,然後 put 到 HDFS 上,一般來說標準程序都會這樣做。
但這樣卻有個缺點,你必須預留 local 的空間和路徑,而且當你需要做伺服器移轉的時候,local 路徑的資料夾與維護也會造成困擾。
因此我想要實作出不用將檔案落至地端便可以在 SFTP 與 HDFS 之間傳輸的功能,以下就用 spark shell 與 jsch.0.1.55.jar 這個套件(SFTP工具包)來實現我們想要的功能。
首先我們先實作從 SFTP 讀檔 sftpToHdfs.txt 後直接傳輸至 HDFS。
import org.apache.hadoop.conf.Configuration
import java.io.InputStream
import java.io.OutputStream
import org.apache.hadoop.fs.Path
import org.apache.commons.compress.utils.IOUtils
import org.apache.hadoop.fs.FileSystem
import com.jcraft.jsch.ChannelSftp
import com.jcraft.jsch.Session
import com.jcraft.jsch.JSch
object DirectSftpAndHDFS {
def main(args: Array[String]): Unit = {
var channelSftp: ChannelSftp = null
var session: Session = null
var input: InputStream = null
var os: OutputStream = null
try {
//sftp to hdfs
val conff = new Configuration(sc.hadoopConfiguration)
conff.addResource(new Path("/usr/local/hadoop/etc/hadoop/core-site.xml"))//這段在cloudera的CDH上似乎不需要加
val fileSystem = FileSystem.get(conff)
val dst = new Path("hdfs://master:9000//user/hduser/input/sftpToHdfs.txt")//最後要丟到HDFS的檔名路徑
os = fileSystem.create(dst)
//連接SFTP
val jss = new JSch()
session = jss.getSession("test", "192.168.159.128", 22)
session.setPassword("sftptest")
val config = new java.util.Properties()
config.put("StrictHostKeyChecking", "no")
session.setConfig(config)
session.connect()
channelSftp = session.openChannel("sftp").asInstanceOf[ChannelSftp]
channelSftp.connect()
input = channelSftp.get("/upload/sftpToHdfs.txt")//sftp上要取得的檔案
IOUtils.copy(input, os)//把SFTP檔案取得的InputStream與HDFS取得的OutputStream帶入做傳輸
} catch {
case ex: Exception => {
println(ex.toString())
}
} finally {
if(null != input) input.close()
if(null != os) os.close()
if(null != channelSftp) channelSftp.disconnect()
if(null != session) session.disconnect()
}
}
}
接著實作從 HDFS 讀檔 hdfsToSftp.txt 後直接傳輸至 SFTP,基本上就是 InputStream 與 OutputStream 取得來源相反,InputStream 改為 FSDataInputStream。
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.commons.compress.utils.IOUtils
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FSDataInputStream
import com.jcraft.jsch.ChannelSftp
import com.jcraft.jsch.Session
import com.jcraft.jsch.JSch
object DirectSftpAndHDFS {
def main(args: Array[String]): Unit = {
var channelSftp: ChannelSftp = null
var session: Session = null
var inStream: FSDataInputStream = null
try {
//hdfs to sftp
val fileSystem = FileSystem.get(sc.hadoopConfiguration)
val from = new Path("hdfs://master:9000//user/hduser/input/hdfsToSftp.txt")
val jss = new JSch()
session = jss.getSession("test", "192.168.159.128", 22)
session.setPassword("sftptest")
val config = new java.util.Properties()
config.put("StrictHostKeyChecking", "no")
session.setConfig(config)
session.connect()
channelSftp = session.openChannel("sftp").asInstanceOf[ChannelSftp]
channelSftp.connect()
val conff = new Configuration(sc.hadoopConfiguration)
conff.addResource(new Path("/usr/local/hadoop/etc/hadoop/core-site.xml"))
inStream = FileSystem.get(conff).open(from)
channelSftp.put(inStream, "/upload/hdfsToSftp.txt", ChannelSftp.OVERWRITE)
//channelSftp.rename("", "")
} catch {
case ex: Exception => {
println(ex.toString())
}
} finally {
if(null != inStream) inStream.close()
if(null != channelSftp) channelSftp.disconnect()
if(null != session) session.disconnect()
}
}
}
在 Spark shell 下執行這些 code。
執行結果如下: