Как можно записать в несколько выводов, зависящих от ключевого Spark использования в единственном Job.
Похожие страницы: Запишите в несколько выводов ключом, Ошпаривающим Hadoop, одно Задание MapReduce
Например.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
удостоверился бы cat prefix/1
a
b
и cat prefix/2
был бы
c
Править: Я недавно добавил новый ответ, который включает полный импорт, сутенера и кодек сжатия, см. https://stackoverflow.com/a/46118044/1586965, который может быть полезным в дополнение к более ранним ответам.
Это включает кодек согласно просьбе, необходимый импорт и сутенера согласно просьбе.
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
def writeAsMultiple(prefix: String, codec: String,
keyName: String = "key")
(implicit sqlContext: SQLContext): Unit = {
import sqlContext.implicits._
rdd.toDF(keyName, "_2").write.partitionBy(keyName)
.format("text").option("codec", codec).save(prefix)
}
}
val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
Одно тонкое различие для OP - то, что он снабдит префиксом <keyName>=
к именам каталогов. Например,
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
дал бы:
prefix/key=1/part-00000
prefix/key=2/part-00000
то, где prefix/my_number=1/part-00000
содержал бы строки a
и b
, и prefix/my_number=2/part-00000
, будет содержать строку c
.
И
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
дал бы:
prefix/foo=1/part-00000
prefix/foo=2/part-00000
должно быть ясно, как отредактировать для [1 112].
Наконец ниже пример для [1 113], который, возможно, более хорош то использование Кортежи.
implicit class PimpedDataset[T](dataset: Dataset[T]) {
def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
dataset.write.partitionBy(field)
.format("text").option("codec", codec).save(prefix)
}
}
При использовании Spark 1.4 + это стало многим, намного легче благодаря DataFrame API. (DataFrames были представлены в Spark 1.3, но partitionBy()
, то, в котором мы нуждаемся, было представлено в 1,4 .)
, Если Вы начинаете с RDD, необходимо будет сначала преобразовать его в DataFrame:
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")
В Python, этот тот же код:
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])
, После того как у Вас есть DataFrame, пишущий в несколько выводов на основе конкретного ключа, просто. Что более - и это - красота DataFrame API - код является в значительной степени тем же через Python, Scala, Java и R:
people_df.write.partitionBy("number").text("people")
И можно легко использовать другие выходные форматы, если Вы хотите:
people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")
В каждом из этих примеров, Spark создаст подкаталог для каждого из ключей, на которых мы разделили DataFrame:
people/
_SUCCESS
number=1/
part-abcd
part-efgh
number=2/
part-abcd
part-efgh
Я сделал бы это как это, которое является масштабируемо
import org.apache.hadoop.io.NullWritable
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]
}
object Split {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Split" + args(1))
val sc = new SparkContext(conf)
sc.textFile("input/path")
.map(a => (k, v)) // Your own implementation
.partitionBy(new HashPartitioner(num))
.saveAsHadoopFile("output/path", classOf[String], classOf[String],
classOf[RDDMultipleTextOutputFormat])
spark.stop()
}
}
, Просто видел подобный ответ выше, но на самом деле нам не нужны настроенные разделы. MultipleTextOutputFormat создаст файл для каждого ключа. Все в порядке, который несколько записывают с теми же ключами, попадают в тот же раздел.
новый HashPartitioner (цифра), где цифра является числом раздела, Вы хотите. В случае, если у Вас есть большое количество различных ключей, можно определить номер к большому. В этом случае каждый раздел не откроет слишком много hdfs обработчиков файлов.
Если у Вас потенциально есть много значений для данного ключа, я думаю, что масштабируемое решение состоит в том, чтобы выписать один файл на ключ на раздел. К сожалению, нет никакой встроенной поддержки этого в Spark, но мы можем сделать что-то на скорую руку.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.mapPartitionsWithIndex { (p, it) =>
val outputs = new MultiWriter(p.toString)
for ((k, v) <- it) {
outputs.write(k.toString, v)
}
outputs.close
Nil.iterator
}
.foreach((x: Nothing) => ()) // To trigger the job.
// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
def write(key: String, value: Any) = {
if (!writers.contains(key)) {
val f = new java.io.File("output/" + key + "/" + suffix)
f.getParentFile.mkdirs
writers(key) = new java.io.PrintWriter(f)
}
writers(key).println(value)
}
def close = writers.values.foreach(_.close)
}
(Замена PrintWriter
с Вашим выбором операции распределенной файловой системы.)
Это делает единственную передачу по RDD и не выполняет перестановки. Это дает Вам один каталог на ключ со многими файлами в каждом.
Я имею подобную потребность и нашел путь. Но это имеет один недостаток (который не является проблемой для моего случая): необходимо повторно разделить Вас данные с одним разделом на выходной файл.
Для разделения таким образом это обычно требует для знания заранее, сколько файлов задание произведет и найдет функцией, которая отобразит каждый ключ к каждому разделу.
Первый позволяют нам создать наш находящийся в MultipleTextOutputFormat класс:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
key.toString
}
override protected def generateActualKey(key: T, value: V) = {
null
}
}
С этим Spark класса получит ключ от раздела (первое/последнее, я предполагаю), и назовите файл с этим ключом, таким образом, не хорошо смешаться, несколько включают тот же раздел.
Для Вашего примера, Вы потребуете пользовательского partitioner. Это сделает задание:
import org.apache.spark.Partitioner
class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
def numPartitions = maxKey
def getPartition(key: Any): Int = key match {
case i: Int if i < maxKey => i
}
}
Теперь позволяют нам соединить все:
val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))
// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)
val prefix = "hdfs://.../prefix"
val partitionedRDD = rdd.partitionBy(partitioner)
partitionedRDD.saveAsHadoopFile(prefix,
classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])
Это генерирует 3 файла под префиксом (названный 1, 2 и 7), обрабатывая все в одной передаче.
, Как Вы видите, Вам нужно некоторое знание о Ваших ключах, чтобы смочь использовать это решение.
Для меня это было легче, потому что мне был нужен один выходной файл для каждого ключевого хеша, и количество файлов находилось под моим контролем, таким образом, я мог использовать запас HashPartitioner для добиваний цели.
Я нуждался в том же самом в Java. Регистрация моего перевода ответ Scala Zhang Zhan к Java Spark пользователи API:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {
@Override
protected String generateFileNameForKeyValue(A key, B value, String name) {
return key.toString();
}
}
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Split Job")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
sc.parallelize(Arrays.asList(strings))
// The first character of the string is the key
.mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
.saveAsHadoopFile("output/", String.class, String.class,
RDDMultipleTextOutputFormat.class);
sc.stop();
}
}
saveAsText () и saveAsHadoop (...) реализованы на основе данных RDD, конкретно методом: PairRDD.saveAsHadoopDataset, который берет данные PairRdd, где это выполняется. Я вижу два возможных варианта: Если Ваши данные являются относительно маленькими в размере, Вы могли бы сохранить некоторое время внедрения путем группировки по RDD, создания нового RDD из каждого набора и использования этого RDD для записи данных. Что-то вроде этого:
val byKey = dataRDD.groupByKey().collect()
val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}
Примечание, что это не будет работать на большие наборы данных b/c материализация итератора в v.toSeq
, не могло бы уместиться в памяти.
другая опция я вижу, и на самом деле тот, который я рекомендовал бы в этом случае: самокрутка, путем прямого вызова hadoop/hdfs API.
Вот обсуждение, которое я запустил при исследовании этого вопроса: , Как создать RDDs из другого RDD?
У меня был подобный вариант использования, где я разделил входной файл на Hadoop HDFS в несколько файлов на основе ключа (1 файл на ключ). Вот мой код scala для искры
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);
@serializable object processGroup {
def apply(groupName:String, records:Iterable[String]): Unit = {
val outFileStream = fs.create(new Path("/output_dir/"+groupName))
for( line <- records ) {
outFileStream.writeUTF(line+"\n")
}
outFileStream.close()
}
}
val infile = sc.textFile("input_file")
val dateGrouped = infile.groupBy( _.split(",")(0))
dateGrouped.foreach( (x) => processGroup(x._1, x._2))
, я сгруппировал записи на основе ключа. Значения для каждого ключа записаны для разделения файла.
хорошие новости для пользователя Python в случае, у Вас есть много столбцы и Вы хотите сохранить все другие столбцы, не разделенные в формате CSV, который будет отказавший, если Вы будете использовать "текстовый" метод в качестве предложения Nick Chammas.
people_df.write.partitionBy("number").text("people")
сообщением об ошибке является "AnalysisException: источник данных u'Text поддерживает только отдельный столбец, и у Вас есть 2 столбца.'";
В искре 2.0.0 (моя тестовая среда является искрой hdp 2.0.0) пакет "com.databricks.spark.csv" теперь интегрируется, и это позволяет нам, сохраните текстовый файл, разделенный только на один столбец, посмотрите, что пример дует:
people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
(1,"2016-12-25", "alice"),
(1,"2016-12-25", "tom"),
(1, "2016-12-25","bob"),
(2,"2016-12-26" ,"charlie")])
df = people_rdd.toDF(["number", "date","name"])
df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people")
[root@namenode people]# tree
.
├── number=1
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
├── number=2
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
└── _SUCCESS
[root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,alice
2016-12-25,alice
2016-12-25,tom
2016-12-25,bob
[root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,charlie
В моей искре 1.6.1 сред, код не бросил ошибки, однако существует только один сгенерированный файл. это не делится двумя папками.
Hope это может помочь.
У меня был подобный вариант использования. Я разрешил его в Java путем записи двух пользовательских классов, реализовав MultipleTextOutputFormat
и RecordWriter
.
Мой вход был JavaPairRDD<String, List<String>>
, и я хотел сохранить его в файле, названном его ключом со всеми строками, содержавшимися в его значении.
Вот код для моего MultipleTextOutputFormat
, реализация
class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {
@Override
protected String generateFileNameForKeyValue(K key, V value, String name) {
return key.toString(); //The return will be used as file name
}
/** The following 4 functions are only for visibility purposes
(they are used in the class MyRecordWriter) **/
protected String generateLeafFileName(String name) {
return super.generateLeafFileName(name);
}
protected V generateActualValue(K key, V value) {
return super.generateActualValue(key, value);
}
protected String getInputFileBasedOutputFileName(JobConf job, String name) {
return super.getInputFileBasedOutputFileName(job, name);
}
protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
return super.getBaseRecordWriter(fs, job, name, arg3);
}
/** Use my custom RecordWriter **/
@Override
RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
final String myName = this.generateLeafFileName(name);
return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
}
}
Вот является кодом для моего RecordWriter
реализация.
class MyRecordWriter<K, V> implements RecordWriter<K, V> {
private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
private final FileSystem fs;
private final JobConf job;
private final Progressable arg3;
private String myName;
TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();
MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
this.fs = fs;
this.job = job;
this.arg3 = arg3;
this.myName = myName;
}
@Override
void write(K key, V value) throws IOException {
String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
RecordWriter rw = this.recordWriters.get(finalPath);
if(rw == null) {
rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
this.recordWriters.put(finalPath, rw);
}
List<String> lines = (List<String>) actualValue;
for (String line : lines) {
rw.write(null, line);
}
}
@Override
void close(Reporter reporter) throws IOException {
Iterator keys = this.recordWriters.keySet().iterator();
while(keys.hasNext()) {
RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
rw.close(reporter);
}
this.recordWriters.clear();
}
}
большая часть кода является точно тем же, чем в FileOutputFormat
. Единственной разницей являются те немного строк
List<String> lines = (List<String>) actualValue;
for (String line : lines) {
rw.write(null, line);
}
, Эти строки позволили мне писать каждую строку своего входа List<String>
на файле. Первый аргумент эти write
функция установлена на [1 112], чтобы не писать ключ на каждой строке.
В заключение, я только должен сделать этот вызов для записи моих файлов
javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);