slide-image

Batch Processing

많은 데이터를 큰 단위로 분석하는 방법 (ex. ETL)이며, 이는 Spark의 RDD operation 을 이용해 표현 가능하다. 

 

 

이를 위해서는 일단 몇 가지 단계를 거쳐야 한다.

 

 

1) RDD 생성
   첫번째, 병렬화 된 collection을 만드는 방법
ex. 기존에 collection이 Python 프로그램이나 Java프로그램으로 짜여졌다면 그 언어를 RDD에 병렬 collection으로 바꾸는 함수가 있음
- 이 함수는 많은 경우에 테스트를 위해 작은 데이터를 분산화시켜 사용하고자 할 때 사용하는 경우가 많음.
   두번째, 하둡 Datasets
ex. Hadoop 분산 파일시스템에 데이터를 가지고 있는 경우 해당하는 데이터 파일들의 위치를 textFile이라는 함수에 주면 읽어 RDD 생성

 


2) RDD 변환
    첫번째, Map
- RDD에 있는 각 원소에 Map에 넘겨진 함수를 적용해 결과를 만들고 그 결과를 모아 RDD를 만듦
    두번째, flatMap
- RDD에 있는 각 원소에 flatMap을 넘겨진 함수를 적용하고 그 결과를 iterator에 각각의 element를 꺼내 flat된 RDD를 만들어낸다
    세번째, filter
- RDD에 포함된 element중에 filter 안에 표현된 조건을 만족하는 원소만 통과시켜 모은 RDD를 반환

 

 

[예제]

  {"apple pear", "apple orange", "apple lemon grape"} 
1) map 변환
    - 보통 tokenize라는 함수를 수행(string을 받아 스페이스를 인식한 후 없애 각 word를 따로 따로 뱉어냄)
    - rdd.map(tokenize) : {["apple", "pear"], ["apple", "orange"], ["apple", "lemon", "grape"]}
    - 이 경우 RDD의 각 원소가 리스트가 됨(["apple", "pear"], ["apple", "orange"]...)
2) flatMap 변환
    - tokenize함수를 적용하여 list의 collection형태가 아닌 string의 collection 형태로 나옴
    - map을 적용해 나오는 리스트에 각 element를 꺼내 하나의 레벨로 만드는 일을 함
    - rdd.flatMap(tokenize) : {"apple", "pear", "apple", "orange", "apple", "lemon", "grape"}

 

 


 

 

RDD 중에는 특별한 구조를 가진 것들이 있음(RDD에 포함된 element가 key/value pair 형태일 경우) -> Pair RDD
->각 key에 대해 병렬로 수행할 수 있는 operation을 보여주거나 네트워크에서 특정 key에 해당하는 데이터를 모으는 operation을 더 보여준다. 

대표적인 operation에는 reduceByKey, join 등이 있다.

 

Pair RDD변환

  • reduceByKey 변환 : 같은 key를 갖는 값들을 결합
  • groupByKey 변환 : 같은 키를 갖는 값을 모음 \
  • keys 변환 : Pair RDD는 key/value PAIR를 ELEMENT로 가지고 있는데 그 중 key만 모아 RDD를 만들고 return 
  • values 변환 : Pair RDD에서 value에 해당하는 부부남 모아 RDD를 만들어 return 
  • sortByKey 변환 : key에 대해 값을 sorting후 만들어진 RDD를 return 
  • join 변환 : 같은 key를 갖는 RDD element를 합해 각 key에 두 개의 RDD로 부터 오는 값들을 넣은 후에 최종적인 RDD 결과를 만들어냄 

Spark Operation(Action)

-> Action은 Spark 가 결과를 만들어내도록 계산을 trigger 하는 operation

  • collect() action : RDD의 원소를 모아 Spark에서 master에 해당하는 드라이버 프로그램에 결과로 돌려줌
  • count() action : RDD 데이터 셋에 얼마나 많은 원소가 있는지 세어서 그 결과를 돌려줌
  • first() action : RDD 데이터에서 첫 번째 원소를 반환
  • take(n) action : RDD 데이터에서 처음 n개 원소를 반환

==============================================

↓ 계산된 RDD를 저장하는 Action 들

  • saveAsTextFile(path) action : 주어진 path에 RDD데이터를 저장
  • saveAsSequenceFile(path) action :  Pair RDD 데이터를 SequenceFile 형태로 path에 저장

 


 

 

Spark를 짤 때 In-memory computing 활용하기 위해 프로그램에서 해주는 것이 Persistence이다. 
-> 계산한 결과를 메모리에 저장 rdd.persist를 불러주면 됨.(RDD 값이 필요할 시 다시 만들지 않고 이렇게 저장된 값을 사용함)
-> RDD의 특정 부분이 손실된 경우 lineage라는 걸 통해 자동적으로 재계산을 함.

 

 

 

[예제]-Log Mining

다양한 시스템 log가 모여있을 때 하당하는 log를 mining하는 예제

 

lines=sc.textFile("hdfs://data/logs")  # log파일을 읽어 line라는 RDD 생성 

errors = lines.filter(lambda line:line.startWith("ERROR")) # 에러가 발생한 것을 logging한 line을 추출(filter변환) -> errors RDD 생성 
messages = errors.map(lambda line:line.split()) # 해당하는 에러 line을 split 해서 여러 개의 word 리스트로 바꿈 
          .map(lambda words: words[1])  # 여러 word 중에 index1에 해당하는 string을 꺼냄 -> message RDD 생성 

messages.persist() # caching messages 

messages.filter(lambda line : "sshd" in line) # 만약 sshd(secure sheel daemon)과 관련된 것이 알고싶다면 messages에 sshd가 있는지 검사 한 뒤 messages로 추출 
        .count()  # 얼마나 많은 message가 있는지 나옴 

messages.filter(lambda line : "ngnix" in line) # sshd 뿐만 아니라 웹서버와 관련된 ngnix가 있는 내용도 messages에 추출 
        .count()  

일단 7번 라인의 caching messages를 빼고 살펴보자

 

messages.filter.count -> count action 시 기존에 위에서 정의된 변환들이 수행 됨. (count라는 결과를 얻기 위해서는 lines -> errors -> messages -> messages(sshd 포함된) 만들어야함)
그리고 ngnix와 관련된 추출을 하고 count를 하면 위 과정이 반복됨.  (사실 이 때 sshd messages의 count 연산은 할 필요가 없음 = 비효율적)

 

-> messages가 여러번 사용된다면 계산 후 저장했으면 좋겠음 (이것이 RDD Persistence)
-> 그래서 messages라는 RDD를 만들고 결과를 저장하라고 선언하면 메모리에 저장됨 -> 7번 라인
-> 그러면 ngnix count 연산 시 저장된 messages를 사용해 처리할 수 있음