Batch Processing
많은 데이터를 큰 단위로 분석하는 방법 (ex. ETL)이며, 이는 Spark의 RDD operation 을 이용해 표현 가능하다.
이를 위해서는 일단 몇 가지 단계를 거쳐야 한다.
1) RDD 생성
첫번째, 병렬화 된 collection을 만드는 방법
ex. 기존에 collection이 Python 프로그램이나 Java프로그램으로 짜여졌다면 그 언어를 RDD에 병렬 collection으로 바꾸는 함수가 있음
- 이 함수는 많은 경우에 테스트를 위해 작은 데이터를 분산화시켜 사용하고자 할 때 사용하는 경우가 많음.
두번째, 하둡 Datasets
ex. Hadoop 분산 파일시스템에 데이터를 가지고 있는 경우 해당하는 데이터 파일들의 위치를 textFile이라는 함수에 주면 읽어 RDD 생성
2) RDD 변환
첫번째, Map
- RDD에 있는 각 원소에 Map에 넘겨진 함수를 적용해 결과를 만들고 그 결과를 모아 RDD를 만듦
두번째, flatMap
- RDD에 있는 각 원소에 flatMap을 넘겨진 함수를 적용하고 그 결과를 iterator에 각각의 element를 꺼내 flat된 RDD를 만들어낸다
세번째, filter
- RDD에 포함된 element중에 filter 안에 표현된 조건을 만족하는 원소만 통과시켜 모은 RDD를 반환
[예제]
{"apple pear", "apple orange", "apple lemon grape"}
1) map 변환
- 보통 tokenize라는 함수를 수행(string을 받아 스페이스를 인식한 후 없애 각 word를 따로 따로 뱉어냄)
- rdd.map(tokenize) : {["apple", "pear"], ["apple", "orange"], ["apple", "lemon", "grape"]}
- 이 경우 RDD의 각 원소가 리스트가 됨(["apple", "pear"], ["apple", "orange"]...)
2) flatMap 변환
- tokenize함수를 적용하여 list의 collection형태가 아닌 string의 collection 형태로 나옴
- map을 적용해 나오는 리스트에 각 element를 꺼내 하나의 레벨로 만드는 일을 함
- rdd.flatMap(tokenize) : {"apple", "pear", "apple", "orange", "apple", "lemon", "grape"}
RDD 중에는 특별한 구조를 가진 것들이 있음(RDD에 포함된 element가 key/value pair 형태일 경우) -> Pair RDD
->각 key에 대해 병렬로 수행할 수 있는 operation을 보여주거나 네트워크에서 특정 key에 해당하는 데이터를 모으는 operation을 더 보여준다.
대표적인 operation에는 reduceByKey, join 등이 있다.
Pair RDD변환
- reduceByKey 변환 : 같은 key를 갖는 값들을 결합
- groupByKey 변환 : 같은 키를 갖는 값을 모음 \
- keys 변환 : Pair RDD는 key/value PAIR를 ELEMENT로 가지고 있는데 그 중 key만 모아 RDD를 만들고 return
- values 변환 : Pair RDD에서 value에 해당하는 부부남 모아 RDD를 만들어 return
- sortByKey 변환 : key에 대해 값을 sorting후 만들어진 RDD를 return
- join 변환 : 같은 key를 갖는 RDD element를 합해 각 key에 두 개의 RDD로 부터 오는 값들을 넣은 후에 최종적인 RDD 결과를 만들어냄
Spark Operation(Action)
-> Action은 Spark 가 결과를 만들어내도록 계산을 trigger 하는 operation
- collect() action : RDD의 원소를 모아 Spark에서 master에 해당하는 드라이버 프로그램에 결과로 돌려줌
- count() action : RDD 데이터 셋에 얼마나 많은 원소가 있는지 세어서 그 결과를 돌려줌
- first() action : RDD 데이터에서 첫 번째 원소를 반환
- take(n) action : RDD 데이터에서 처음 n개 원소를 반환
==============================================
↓ 계산된 RDD를 저장하는 Action 들
- saveAsTextFile(path) action : 주어진 path에 RDD데이터를 저장
- saveAsSequenceFile(path) action : Pair RDD 데이터를 SequenceFile 형태로 path에 저장
Spark를 짤 때 In-memory computing 활용하기 위해 프로그램에서 해주는 것이 Persistence이다.
-> 계산한 결과를 메모리에 저장 rdd.persist를 불러주면 됨.(RDD 값이 필요할 시 다시 만들지 않고 이렇게 저장된 값을 사용함)
-> RDD의 특정 부분이 손실된 경우 lineage라는 걸 통해 자동적으로 재계산을 함.
[예제]-Log Mining
다양한 시스템 log가 모여있을 때 하당하는 log를 mining하는 예제
lines=sc.textFile("hdfs://data/logs") # log파일을 읽어 line라는 RDD 생성
errors = lines.filter(lambda line:line.startWith("ERROR")) # 에러가 발생한 것을 logging한 line을 추출(filter변환) -> errors RDD 생성
messages = errors.map(lambda line:line.split()) # 해당하는 에러 line을 split 해서 여러 개의 word 리스트로 바꿈
.map(lambda words: words[1]) # 여러 word 중에 index1에 해당하는 string을 꺼냄 -> message RDD 생성
messages.persist() # caching messages
messages.filter(lambda line : "sshd" in line) # 만약 sshd(secure sheel daemon)과 관련된 것이 알고싶다면 messages에 sshd가 있는지 검사 한 뒤 messages로 추출
.count() # 얼마나 많은 message가 있는지 나옴
messages.filter(lambda line : "ngnix" in line) # sshd 뿐만 아니라 웹서버와 관련된 ngnix가 있는 내용도 messages에 추출
.count()
일단 7번 라인의 caching messages를 빼고 살펴보자
messages.filter.count -> count action 시 기존에 위에서 정의된 변환들이 수행 됨. (count라는 결과를 얻기 위해서는 lines -> errors -> messages -> messages(sshd 포함된) 만들어야함)
그리고 ngnix와 관련된 추출을 하고 count를 하면 위 과정이 반복됨. (사실 이 때 sshd messages의 count 연산은 할 필요가 없음 = 비효율적)
-> messages가 여러번 사용된다면 계산 후 저장했으면 좋겠음 (이것이 RDD Persistence)
-> 그래서 messages라는 RDD를 만들고 결과를 저장하라고 선언하면 메모리에 저장됨 -> 7번 라인
-> 그러면 ngnix count 연산 시 저장된 messages를 사용해 처리할 수 있음
'Dev Study > BigData' 카테고리의 다른 글
[Python] 1. 크롤링(1) (0) | 2019.07.09 |
---|---|
3. 빅데이터 배치 분석 및 대화형 질의(2) - 대화형 질의 (0) | 2019.04.18 |
Spark Test (0) | 2019.04.12 |
Hadoop 설치(2.7.7) (0) | 2019.04.12 |
2. 빅데이터 처리 시스템 개요 및 예시(3) - Spark 개요 (0) | 2019.04.12 |