Disaggregated HDP Spark and Hive with MinIO
MinIO를 사용한 분리된 HDP Spark 및 Hive
1. Cloud-native Architecture
1. 클라우드 네이티브 아키텍처
Kubernetes manages stateless Spark and Hive containers elastically on the compute nodes. Spark has native scheduler integration with Kubernetes. Hive, for legacy reasons, uses YARN scheduler on top of Kubernetes.
Kubernetes는 컴퓨팅 노드에서 상태 비저장 Spark 및 Hive 컨테이너를 탄력적으로 관리합니다. Spark에는 Kubernetes와 기본 스케줄러 통합이 있습니다. Hive는 기존 이유로 인해 Kubernetes 위에 YARN 스케줄러를 사용합니다.
All access to MinIO object storage is via S3/SQL SELECT API. In addition to the compute nodes, MinIO containers are also managed by Kubernetes as stateful containers with local storage (JBOD/JBOF) mapped as persistent local volumes.
MinIO 객체 스토리지에 대한 모든 액세스는 S3/SQL SELECT API를 통해 이루어집니다. 컴퓨팅 노드 외에도 MinIO 컨테이너는 Kubernetes에서 영구 로컬 볼륨으로 매핑된 로컬 스토리지 (JBOD/JBOF)가 있는 상태 저장 컨테이너로 관리됩니다.
This architecture enables multi-tenant MinIO, allowing isolation of data between customers.
이 아키텍처는 다중 테넌트 MinIO를 지원하여 고객 간에 데이터를 격리할 수 있습니다.
MinIO also supports multi-cluster, multi-site federation similar to AWS regions and tiers.
MinIO는 AWS 구역 및 계층과 유사한 다중 클러스터, 다중 사이트 연합도 지원합니다.
Using MinIO Information Lifecycle Management (ILM), you can configure data to be tiered between NVMe based hot storage, and HDD based warm storage. All data is encrypted with per-object key.
MinIO 정보 수명주기 관리 (ILM)를 사용하명 NVMe 기반 핫 스토리지와 HDD 기반 웜 스토리지 간에 계층화되도록 데이터를 구성할 수 있습니다. 모든 데이터는 객체별 키로 암호화됩니다.
Access Control and Identity Management between the tenants are managed by MinIO using OpenID Connect or Kerberos/LDAP/AD.
테넌트 간의 액세스 제어 및 ID 관리는 OpenID Connect 또는 Kerberos/LDAP/AD를 사용하여 MinIO에서 관리됩니다.
2. Prerequisites
2. 전제 조건
Install Hortonworks Distribution using this guide.
이 가이드를 사용하여 Hortonworks 분산을 설치하십시오.
Setup Ambari which automatically sets up YARN
YARN을 자동으로 설정하는 Ambari를 설정
Installing Spark
스파크 설치
Install MinIO Distributed Server using one of the guides below.
아래 가이드 중 하나를 사용하여 MinIO 분산 서버를 설치하세요.
Deployment based on Kubernetes
Kubernetes 기반 배포
Deployment based on MinIO Helm Chart
MinIO Helm 차트를 기반으로 한 배포
3. Configure Hadoop, Spark, Hive to use MinIO
3. MinIO를 사용하도록 Hadoop, Spark, Hive 구성
After successful installation navigate to the Ambari UI http://
성공적으로 설치한 후 Ambari UI http://
3.1 Configure Hadoop
3.1 하둡 구성
Navigate to Services -> HDFS -> CONFIGS -> ADVANCED as shown below
아래와 같이 서비스-> HDFS -> 구성 -> 고급으로 이동합니다
Navigate to Custom core-site to configure MinIO parameters for _s3a_ connector
_s3a_ 커넥터에 대한 MinIO 매개변수를 구성하려면 사용자 정의 코어 사이트로 이동하세요
Copy
복사
sudo pip install yq
alias kv-pairify='xq ".configuration[]" | jq ".[]" | jq -r ".name + \"=\" + .value"'
Let's take for example a set of 12 compute nodes with an aggregate memory of 1.2TiB, we need to do following settings for optimal results.
예를 들어 총 메모리가 1.2TiB인 12개의 컴퓨팅 노드 세트를 예로 들어 보겠습니다. 최적의 결과를 얻으려면 다음 설정을 수행해야 합니다.
Add the following optimal entries for core-site.xml to configure s3a with MinIO. Most important options here are
MinIO로 s3a를 구성하려면 core-site.xml에 대해 다음 최적 항목을 추가하세요. 여기서 가장 중요한 옵션은 다음과 같습니다
Copy
복사
cat ${HADOOP_CONF_DIR}/core-site.xml | kv-pairify | grep "mapred"
mapred.maxthreads.generate.mapoutput=2 # Num threads to write map outputs
mapred.maxthreads.generate.mapoutput=2 # 맵 출력을 쓰기 위한 스레드 수
mapred.maxthreads.partition.closer=0 # Asynchronous map flushers
mapred.maxthreads.partition.closer=0 # 비동기 맵 플러셔
mapreduce.fileoutputcommitter.algorithm.version=2 # Use the latest committer version
mapreduce.fileoutputcommitter.algorithm.version =2 # 최신 커미터 버전을 사용하세요
mapreduce.job.reduce.slowstart.completedmaps=0.99 # 99% map, then reduce
mapreduce.job.reduce.slowstart.completedmaps=0.99 # 99% 맵 후 축소합니다
mapreduce.reduce.shuffle.input.buffer.percent=0.9 # Min % buffer in RAM
mapreduce.reduce.shuffle.input.buffer.percent=0.9 # RAM의 최소 % 버퍼
mapreduce.reduce.shuffle.merge.percent=0.9 # Minimum % merges in RAM
mapreduce.reduce.shuffle.merge.percent=0.9 # RAM에 병합되는 최소 %
mapreduce.reduce.speculative=false # Disable speculation for reducing
mapreduce.reduce.speculative=false # 감소를 위한 추측을 비활성화합니다
mapreduce.task.io.sort.factor=999 # Threshold before writing to disk
mapreduce.task.io.sort.factor=999 # 디스크에 쓰기 전 임계값
mapreduce.task.sort.spill.percent=0.9 # Minimum % before spilling to disk
mapreduce.task.sort.spill.percent=0.9 # 디스크로 보내기 전 최소 %
S3A is the connector to use S3 and other S3-compatible object stores such as MinIO. MapReduce workloads typically interact with object stores in the same way they do with HDFS.
S3A는 S3 및 MinIO와 같은 기타 S3 호환 객체 저장소를 사용하기 위한 커넥터입니다. MapReduce 워크로드는 일반적으로 HDFS와 동일한 방식으로 객체 저장소와 상호 작용합니다.
These workloads rely on HDFS atomic rename functionality to complete writing data to the datastore. Object storage operations are atomic by nature and they do not require/implement rename API.
이러한 워크로드는 HDFS 원자 이름 변경 기능을 사용하여 데이터 저장소에 데이터 쓰기를 완료합니다. 객체 스토리지 작업은 본질적으로 원자적이며 이름 바꾸기 API가 필요/구현되지 않습니다.
The default S3A committer emulates renames through copy and delete APIs. This interaction pattern causes significant loss of performance because of the write amplification.
기본 S3A 커미터는 복사 및 삭제 API를 통해 이름 바꾸기를 에뮬레이트(모방)합니다. 이 상호 작용 패턴은 쓰기 증폭으로 인해 성능이 크게 저하됩니다.
Netflix, for example, developed two new staging committers - the Directory staging committer and the Partitioned staging committer - to take full advantage of native object storage operations.
예를 들어, Netflix는 기본 객체 스토리지 작업을 최대한 활용하기 위해 두 개의 새로운 스테이징 커미터 (Directory 스테이징 커미터와 Partitioned 스테이징 커미터)를 개발했습니다.
These committers do not require rename operation. The two staging committers were evaluated, along with another new addition called the Magic committer for benchmarking.
이러한 커미터에는 이름 바꾸기 작업이 필요하지 않습니다. 벤치마킹을 위한 Magic 커미터라는 또 다른 새로운 추가 기능과 함께 두 개의 스테이징 커미터가 평가되었습니다.
It was found that the directory staging committer was the fastest among the three, S3A connector should be configured with the following parameters for optimal results:
디렉토리 스테이징 커미터가 세 가지 중에서 가장 빠른 것으로 나타났습니다. 최적의 결과를 얻으려면 S3A 커넥터를 다음 매개 변수로 구성해야 합니다.
Copy
복사
cat ${HADOOP_CONF_DIR}/core-site.xml | kv-pairify | grep "s3a"
fs.s3a.access.key=minio
fs.s3a.secret.key=minio123
fs.s3a.path.style.access=true
fs.s3a.block.size=512M
fs.s3a.buffer.dir=${hadoop.tmp.dir}/s3a
fs.s3a.committer.magic.enabled=false
fs.s3a.committer.name=directory
fs.s3a.committer.staging.abort.pending.uploads=true
fs.s3a.committer.staging.conflict-mode=append
fs.s3a.committer.staging.tmp.path=/tmp/staging
fs.s3a.committer.staging.unique-filenames=true
fs.s3a.connection.establish.timeout=5000
fs.s3a.connection.ssl.enabled=false
fs.s3a.connection.timeout=200000
fs.s3a.endpoint=http://minio:9000
fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.committer.threads=2048 # Number of threads writing to MinIO
fs.s3a.committer.threads=2048 # MinIO에 쓰는 스레드 수
fs.s3a.connection.maximum=8192 # Maximum number of concurrent conns
fs.s3a.connection.maximum=8192 # 최대 동시 연결 수
fs.s3a.fast.upload.active.blocks=2048 # Number of parallel uploads
fs.s3a.fast.upload.active.blocks=2048 # 병렬 업로드 수
fs.s3a.fast.upload.buffer=disk # Use disk as the buffer for uploads
fs.s3a.fast.upload.buffer=disk # 디스크를 업로드용 버퍼로 사용
fs.s3a.fast.upload=true # Turn on fast upload mode
fs.s3a.fast.upload=true # 빠른 업로드 모드를 켭니다
fs.s3a.max.total.tasks=2048 # Maximum number of parallel tasks
fs.s3a.max.total.tasks=2048 # 최대 병렬 작업 수
fs.s3a.multipart.size=512M # Size of each multipart chunk
fs.s3a.multipart.size=512M # 각 멀티파트 청크의 크기
fs.s3a.multipart.threshold=512M # Size before using multipart uploads
fs.s3a.multipart.threshold=512M # 멀티파트 업로드를 사용하기 전의 크기
fs.s3a.socket.recv.buffer=65536 # Read socket buffer hint
fs.s3a.socket.recv.buffer=65536 # 소켓 버퍼 힌트 읽기
fs.s3a.socket.send.buffer=65536 # Write socket buffer hint
fs.s3a.socket.send.buffer=65536 # 소켓 버퍼 힌트 쓰기
fs.s3a.threads.max=2048 # Maximum number of threads for S3A
fs.s3a.threads.max=2048 # S3A의 최대 스레드 수
The rest of the other optimization options are discussed in the links below
나머지 최적화 옵션은 아래 링크에서 설명합니다.
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
https://hadoop.apache.org/docs/r3.1.1/hadoop-aws/tools/hadoop-aws/committers.html
Once the config changes are applied, proceed to restart Hadoop services.
구성 변경 사항이 적용되면 Hadoop 서비스를 다시 시작합니다.
3.2 Configure Spark2
3.2 Spark2 구성
Navigate to Services -> Spark2 -> CONFIGS as shown below
아래와 같이 서비스 -> Spark2 -> 구성으로 이동합니다
Navigate to “Custom spark-defaults” to configure MinIO parameters for _s3a_ connector
_s3a_ 커넥터에 대한 MinIO 매개변수를 구성하려면 “Custom spark-defaults”로 이동하세요
Add the following optimal entries for spark-defaults.conf to configure Spark with MinIO.
MinIO로 Spark를 구성하려면 Spark-defaults.conf에 대해 다음 최적 항목을 추가하세요.
Copy
복사
spark.hadoop.fs.s3a.access.key minio
spark.hadoop.fs.s3a.secret.key minio123
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.block.size 512M
spark.hadoop.fs.s3a.buffer.dir ${hadoop.tmp.dir}/s3a
spark.hadoop.fs.s3a.committer.magic.enabled false
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads true
spark.hadoop.fs.s3a.committer.staging.conflict-mode append
spark.hadoop.fs.s3a.committer.staging.tmp.path /tmp/staging
spark.hadoop.fs.s3a.committer.staging.unique-filenames true
spark.hadoop.fs.s3a.committer.threads 2048 # number of threads writing to MinIO
spark.hadoop.fs.s3a.committer.threads 2048 # MinIO에 쓰는 스레드 수
spark.hadoop.fs.s3a.connection.establish.timeout 5000
spark.hadoop.fs.s3a.connection.establish.timeout 5000
spark.hadoop.fs.s3a.connection.maximum 8192 # maximum number of concurrent conns
spark.hadoop.fs.s3a.connection.maximum 8192 # 최대 동시 연결 수
spark.hadoop.fs.s3a.connection.ssl.enabled false
spark.hadoop.fs.s3a.connection.ssl.enabled false
spark.hadoop.fs.s3a.connection.timeout 200000
spark.hadoop.fs.s3a.connection.timeout 200000
spark.hadoop.fs.s3a.endpoint http://minio:9000
spark.hadoop.fs.s3a.endpoint http://minio:9000
spark.hadoop.fs.s3a.fast.upload.active.blocks 2048 # number of parallel uploads
spark.hadoop.fs.s3a.fast.upload.active.blocks 2048 # 동시 업로드 수
spark.hadoop.fs.s3a.fast.upload.buffer disk # use disk as the buffer for uploads
spark.hadoop.fs.s3a.fast.upload.buffer disk # 디스크를 업로드용 버퍼로 사용
spark.hadoop.fs.s3a.fast.upload true # turn on fast upload mode
spark.hadoop.fs.s3a.fast.upload true # 빠른 업로드 모드를 켭니다
spark.hadoop.fs.s3a.impl org.apache.hadoop.spark.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.max.total.tasks 2048 # maximum number of parallel tasks
spark.hadoop.fs.s3a.max.total.tasks 2048 # 최대 병렬 작업 수
spark.hadoop.fs.s3a.multipart.size 512M # size of each multipart chunk
spark.hadoop.fs.s3a.multipart.size 512M # 각 멀티파트 청크의 크기
spark.hadoop.fs.s3a.multipart.threshold 512M # size before using multipart uploads
spark.hadoop.fs.s3a.multipart.threshold 512M # 멀티파트 업로드를 사용하기 전의 크기
spark.hadoop.fs.s3a.socket.recv.buffer 65536 # read socket buffer hint
spark.hadoop.fs.s3a.socket.recv.buffer 65536 # 소켓 버퍼 힌트 읽기
spark.hadoop.fs.s3a.socket.send.buffer 65536 # write socket buffer hint
spark.hadoop.fs.s3a.socket.send.buffer 65536 # 소켓 버퍼 힌트 쓰기
spark.hadoop.fs.s3a.threads.max 2048 # maximum number of threads for S3A
spark.hadoop.fs.s3a.threads.max 2048 # S3A의 최대 스레드 수
Once the config changes are applied, proceed to restart Spark services.
구성 변경 사항이 적용되면 Spark 서비스를 다시 시작하세요.
3.3 Configure Hive
3.3 하이브 구성
Navigate to Services -> Hive -> CONFIGS-> ADVANCED as shown below
아래와 같이 서비스 -> Hive -> 구성 -> 고급으로 이동합니다.
Navigate to “Custom hive-site” to configure MinIO parameters for _s3a_ connector
_s3a_ 커넥터에 대한 MinIO 매개변수를 구성하려면 "사용자 정의 하이브 사이트"로 이동하세요.
Add the following optimal entries for hive-site.xml to configure Hive with MinIO.
MinIO로 Hive를 구성하려면 hive-site.xml에 대해 다음 최적 항목을 추가하세요.
Copy
복사
hive.blobstore.use.blobstore.as.scratchdir=true
hive.exec.input.listing.max.threads=50
hive.load.dynamic.partitions.thread=25
hive.metastore.fshandler.threads=50
hive.mv.files.threads=40
mapreduce.input.fileinputformat.list-status.num-threads=50
For more information about these options please visit https://www.cloudera.com/documentation/enterprise/5-11-x/topics/admin_hive_on_s3_tuning.html
이러한 옵션에 대한 자세한 내용을 보려면 다음 사이트를 방문하세요. https://www.cloudera.com/documentation/enterprise/5-11-x/topics/admin_hive_on_s3_tuning.html
Once the config changes are applied, proceed to restart all Hive services.
구성 변경 사항이 적용되면 모든 Hive 서비스를 다시 시작합니다.
4. Run Sample Applications
4. 샘플 애플리케이션 실행
After installing Hive, Hadoop and Spark successfully, we can now proceed to run some sample applications to see if they are configured appropriately.
Hive, Hadoop 및 Spark를 성공적으로 설치한 후 이제 일부 샘플 애플리케이션을 실행하여 적절하게 구성되었는지 확인할 수 있습니다.
We can use Spark Pi and Spark WordCount programs to validate our Spark installation. We can also explore how to run Spark jobs from the command line and Spark shell.
Spark Pi 및 Spark WordCount 프로그램을 사용하여 Spark 설치를 확인할 수 있습니다. 명령줄과 Spark 셸에서 Spark 작업을 실행하는 방법도 알아볼 수 있습니다.
4.1 Spark Pi
4.1 스파크 파이
Test the Spark installation by running the following compute intensive example, which calculates pi by “throwing darts” at a circle.
원에 "다트를 던져" 파이를 계산하는 다음 계산 집약적 예제를 실행하여 Spark 설치를 테스트합니다.
The program generates points in the unit square ((0,0) to (1,1)) and counts how many points fall within the unit circle within the square. The result approximates pi.
프로그램은 단위 정사각형((0,0) ~ (1,1))에 점을 생성하고 정사각형 내의 단위원 내에 몇 개의 점이 속하는지 계산합니다. 결과는 pi에 가깝습니다.
Follow these steps to run the Spark Pi example:
Spark Pi 예제를 실행하려면 다음 단계를 따르세요.
Login as user ‘spark’.
사용자 '스파크'로 로그인하세요.
When the job runs, the library can now use MinIO during intermediate processing.
작업이 실행되면 라이브러리는 이제 중간 처리 중에 MinIO를 사용할 수 있습니다.
Navigate to a node with the Spark client and access the spark2-client directory:
Spark 클라이언트가 있는 노드로 이동하여 Spark2-client 디렉터리에 액세스합니다.
Copy
복사
cd /usr/hdp/current/spark2-client
su spark
Run the Apache Spark Pi job in yarn-client mode, using code from org.apache.spark:
org.apache.spark의 코드를 사용하여 Yarn-client 모드에서 Apache Spark Pi 작업을 실행합니다.
Copy
복사
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-client \
--num-executors 1 \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
examples/jars/spark-examples*.jar 10
The job should produce an output as shown below. Note the value of pi in the output.
작업은 아래와 같이 출력을 생성해야 합니다. 출력에서 pi 값을 확인합니다.
17/03/22 23:21:10 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.302805 s
Pi is roughly 3.1445191445191445
Job status can also be viewed in a browser by navigating to the YARN ResourceManager Web UI and clicking on job history server information.
YARN ResourceManager 웹 UI로 이동하고 작업 기록 서버 정보를 클릭하여 브라우저에서 작업 상태를 볼 수도 있습니다.
4.2 WordCount
4.2 WordCount
WordCount is a simple program that counts how often a word occurs in a text file. The code builds a dataset of (String, Int) pairs called counts, and saves the dataset to a file.
WordCount는 텍스트 파일에서 단어가 얼마나 자주 나타나는지를 계산하는 간단한 프로그램입니다. 코드는 counts라는 (String, Int) 쌍의 데이터세트를 구축하고 데이터세트를 파일에 저장합니다.
The following example submits WordCount code to the Scala shell. Select an input file for the Spark WordCount example. We can use any text file as input.
다음 예에서는 WordCount 코드를 Scala 셸에 제출합니다. Spark WordCount 예제에 대한 입력 파일을 선택합니다. 모든 텍스트 파일을 입력으로 사용할 수 있습니다.
Login as user ‘spark’.
사용자 '스파크'로 로그인하세요.
When the job runs, the library can now use MinIO during intermediate processing.
작업이 실행되면 라이브러리는 중간 처리 중에 MinIO를 사용할 수 있습니다.
Navigate to a node with Spark client and access the spark2-client directory:
스파크 클라이언트가 있는 노드로 이동하여 spark2-client 디렉토리에 액세스합니다.
cd /usr/hdp/current/spark2-client
su spark
The following example uses log4j.properties as the input file:
다음 예에서는 log4j.properties를 입력 파일로 사용합니다.
4.2.1 Upload the input file to HDFS:
4.2.1 HDFS에 입력 파일 업로드:
hadoop fs -copyFromLocal /etc/hadoop/conf/log4j.properties
s3a://testbucket/testdata
4.2.2 Run the Spark shell:
4.2.2 스파크 셸 실행:
./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m
The command should produce an output as shown below. (with additional status messages):
명령은 아래와 같이 출력을 생성해야 합니다. (추가 상태 메시지 포함):
Spark context Web UI available at http://172.26.236.247:4041 Spark context available as 'sc' (master = yarn, app id =
Spark 컨텍스트 웹 UI는 http://172.26.236.247:4041에서 사용할 수 있습니다. 'sc'(마스터 = 원사, 앱 ID =)로 사용 가능한 Spark 컨텍스트
application_1490217230866_0002).
Spark session available as 'spark'.
Spark 세션은 'spark'로 사용 가능합니다.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0.2.6.0.0-598
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Scala 버전 2.11.8(Java HotSpot(TM) 64비트 서버 VM, Java 1.8.0_112) 사용
Type in expressions to have them evaluated.
평가할 표현식을 입력하세요.
Type :help for more information.
자세한 내용을 보려면 :help를 입력하세요.
scala>
At the scala> prompt, submit the job by typing the following commands, Replace node names, file name, and file location with your values:
scala> 프롬프트에서 다음 명령을 입력하여 작업을 제출합니다. 노드 이름, 파일 이름 및 파일 위치를 원하는 값으로 바꿉니다.
scala> val file = sc.textFile("s3a://testbucket/testdata")
file: org.apache.spark.rdd.RDD[String] = s3a://testbucket/testdata MapPartitionsRDD[1] at textFile at :24
scala> val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :25
scala> counts.saveAsTextFile("s3a://testbucket/wordcount")
Use one of the following approaches to view job output:
작업 출력을 보려면 다음 방법 중 하나를 사용하세요:
View output in the Scala shell:
스칼라 쉘에서 출력을 봅니다:
scala> counts.count()
364
To view the output from MinIO exit the Scala shell. View WordCount job status:
MinIO의 출력을 보려면 스칼라 쉘을 종료하세요. WordCount 작업 상태 보기:
Copy
복사
hadoop fs -ls s3a://testbucket/wordcount
The output should be similar to the following:
출력은 다음과 유사해야 합니다.
Found 3 items
3개 항목을 찾았습니다
-rw-rw-rw- 1 spark spark 0 2019-05-04 01:36 s3a://testbucket/wordcount/_SUCCESS
-rw-rw-rw- 1 spark spark 4956 2019-05-04 01:36 s3a://testbucket/wordcount/part-00000
-rw-rw-rw- 1 spark spark 5616 2019-05-04 01:36 s3a://testbucket/wordcount/part-00001