Найдите размер данных, хранящихся в rdd, из текстового файла в apache spark

Я новичок в Apache Spark (версия 1.4.1). Я написал небольшой код для чтения текстового файла и сохранил его данные в Rdd.

Есть ли способ получить размер данных в rdd.

Это мой код:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.SizeEstimator
import org.apache.spark.sql.Row

object RddSize {

  def main(args: Array[String]) {

    val sc = new SparkContext("local", "data size")
    val FILE_LOCATION = "src/main/resources/employees.csv"
    val peopleRdd = sc.textFile(FILE_LOCATION)

    val newRdd = peopleRdd.filter(str => str.contains(",M,"))
    //Here I want to find whats the size remaining data
  }
} 

Я хочу получить размер данных до преобразования фильтра (peopleRdd) и после него (newRdd).


person bob    schedule 24.08.2015    source источник
comment
Что вы имеете в виду под размером? Количество строк в СДР? Если это так, то это делает функция count RDD - def count(): Long Return the number of elements in the RDD. из документа Spark.   -  person The Archetypal Paul    schedule 24.08.2015
comment
@Paul Нет, здесь размер не означает, что рядов нет. Предположим, мой файл имеет размер 100 МБ, тогда я получил данные файла в rdd и применил фильтр. Данные должны быть уменьшены в своем размере. Я хочу получить этот размер (в МБ)   -  person bob    schedule 24.08.2015
comment
Не уверен, что ты хочешь этого. RDD ленивы, поэтому для newRDD еще ничего не было выполнено. Если вам нужен размер, вы заставите его оценить и, вероятно, сделаете слишком много работы.,   -  person The Archetypal Paul    schedule 24.08.2015
comment
Спасибо за ответ @Paul, я новичок в Spark. Понятия не имею, как заставить его оценить размер. Не могли бы вы дать несколько советов.   -  person bob    schedule 24.08.2015
comment
Вы не хотите заставлять его оценивать размер, но оценивать ответ в целом. Почему тебя волнует размер?   -  person The Archetypal Paul    schedule 24.08.2015
comment
@Paul Я работаю над приложением, которое принимает файл и фильтрует некоторые данные. В ответ я хочу показать размер данных, который действительно полезен после обработки большого файла (может быть 1 ГБ).   -  person bob    schedule 24.08.2015
comment
Еще раз извините. Почему? Если это не окончательный ответ и дополнительная обработка не требуется. В этом случае запишите его в файл и посмотрите на размер файла. Я не понимаю, почему интересно знать размер, который он потребляет в памяти.   -  person The Archetypal Paul    schedule 24.08.2015
comment
На самом деле это не окончательный ответ. В зависимости от пользователя, если пользователь хочет @Paul, он может сделать еще несколько фильтров на основе оставшегося размера. Для этого мне нужно показать начальный размер и оставшийся размер. После того, как преобразования будут выполнены, данные должны быть сохранены в искровом sql (я знаю эту часть, я застрял в поиске размера).   -  person bob    schedule 24.08.2015
comment
Пожалуйста, объясните, почему вам нужен размер для оставшейся фильтрации. Spark работает (и увеличивает свою производительность), будучи «ленивым» и выполняя фактические вычисления только тогда, когда требуется результат. Итак, обычно в середине вычисления не остается оставшегося размера, потому что вычисление еще не было выполнено. Поэтому, пожалуйста, объясните, что вы пытаетесь сделать в целом - потому что очень вероятно, что знать размер в середине не обязательно, или это отрицательно повлияет на производительность.   -  person The Archetypal Paul    schedule 24.08.2015
comment
@Paul Допустим, у меня есть файл размером 100мб. Я прочитал это в RDD, теперь я сделал некоторые преобразования в RDD. После этого я создал таблицу в искровом sql из этого RDD. Я хочу знать, каков размер данных (увеличился или уменьшился) в RDD непосредственно перед созданием таблицы в Spark sql. Надеюсь, теперь моя проблема ясна. Спасибо, что разобрались в моей проблеме :). Пожалуйста, дайте мне знать, возможно это или нет.   -  person bob    schedule 24.08.2015
comment
Нет, не понятно. ПОЧЕМУ вы хотите знать размер? Вы все время говорите мне, что хотите знать размер, но почему? Что ты собираешься делать с ответом? Если вы хотите знать, каков будет размер таблицы в памяти, прежде чем создавать ее, то нет, я не думаю, что это возможно. Я не намеренно тупой, я понятия не имею, как знание RDD 100 или 90 МБ или 110 МБ поможет вам в любом случае   -  person The Archetypal Paul    schedule 24.08.2015
comment
Таблица Spark sql сжимает данные. так что это бесполезно, иначе я бы пошел с размером памяти. Большое спасибо за помощь :)   -  person bob    schedule 24.08.2015


Ответы (3)


Есть несколько способов получить размер RDD

1. Добавьте слушателя искры в свой контекст искры

SparkDriver.getContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
  val map = stageCompleted.stageInfo.rddInfos
  map.foreach(row => {
      println("rdd memSize " + row.memSize)
      println("rdd diskSize " + row.diskSize)
   })
}})

2. Сохраните rdd как текстовый файл.

myRDD.saveAsTextFile("person.txt")

и вызовите API REST Apache Spark.

/applications/[app-id]/stages

3. Вы также можете попробовать SizeEstimater.

val rddSize = SizeEstimator.estimate(myRDD)
person Gabber    schedule 27.08.2015
comment
Спасибо !! Мы попробуем их и сообщим вам в случае возникновения каких-либо проблем. - person bob; 28.08.2015
comment
@Gabber: Хорошее объяснение :) - person Ram Ghadiyaram; 13.10.2016
comment
спасибо @RamPrasadG - person Gabber; 14.10.2016

Я не уверен, что тебе нужно это делать. Вы можете кэшировать rdd и проверить размер в пользовательском интерфейсе Spark. Но допустим, что вы действительно хотите сделать это программно, вот решение.

    def calcRDDSize(rdd: RDD[String]): Long = {
        //map to the size of each string, UTF-8 is the default
        rdd.map(_.getBytes("UTF-8").length.toLong) 
           .reduce(_+_) //add the sizes together
    }

Затем вы можете вызвать эту функцию для своих двух RDD:

println(s"peopleRdd is [${calcRDDSize(peopleRdd)}] bytes in size")
println(s"newRdd is [${calcRDDSize(newRdd)}] bytes in size")

Это решение должно работать, даже если размер файла больше, чем доступная в кластере память.

person Patrick McGloin    schedule 26.08.2015
comment
Я не хочу кэшировать RDD. Он будет получать данные в памяти, при этом не требуется. - person bob; 27.08.2015
comment
Привет, можно ли также преобразовать в фрейм данных? - person Venu A Positive; 28.01.2016

В документе Spark API говорится, что:

  1. Вы можете получить информацию о своих RDD из контекста Spark: sc.getRDDStorageInfo
  2. Информация RDD включает память и размер диска: Документ RDDInfo
person Little Bobby Tables    schedule 27.08.2015