मैं एक फ़ाइल में लिखने से पहले एक स्पार्क डेटाफ्रेम को एक इंडेक्स कॉलम के आधार पर विभाजन की संख्या में विभाजित करना चाहता हूं। मैं यह नियंत्रित करना चाहता हूं कि डेटाफ़्रेम के आकार के आधार पर कितने विभाजन बनाने हैं और फिर partitionBy का उपयोग करके लकड़ी की छत फ़ाइल में लिखते समय उपयोग करें।

एक उदाहरण DataFrame होने:

 i     b
 0    11
 1     9
 2    13
 3     2
 4    15
 5     3
 6    14
 7    16
 8    11
 9     9
 10   17
 11   10

यह मानते हुए कि मैं कॉलम i में मानों के आधार पर 4 विभाजन बनाना चाहता हूं, तो विभाजन कॉलम g को दिए गए मानों के अनुरूप होंगे:

g    i     b
0    0    11
0    1     9
0    2    13
1    3     2
1    4    15
1    5     3
2    6    14
2    7    16
2    8    11
3    9     9
3   10    17
3   11    10

स्पार्क में ऐसा करने का पसंदीदा तरीका क्या है?

-2
Krzysztof Słowiński 11 फरवरी 2019, 17:21

1 उत्तर

सबसे बढ़िया उत्तर

हालांकि दस्तावेज़ीकरण का पालन करना थोड़ा कठिन लगता है, और प्रश्न पर कुछ धारणाएं बनाना - यानी यह 4 या बल्कि एन फाइलें (?) मजबूत>स्पार्क २.४ अनुकूलित उदाहरण जो २० रिकॉर्ड लेता है और उन्हें ४ समान रेंज वाले विभाजनों में विभाजित करता है और फिर उन्हें लिखता है। चल दर:

val list = sc.makeRDD((1 to 20)).map((_, 1,"2019-01-01", "2019-01-01",1,2,"XXXXXXXXXXXXXXXXXXXXXXXXXX"))

val df = list.toDF("customer_id", "dummy", "report_date", "date", "value_1", "value_2", "dummy_string")
df.show(false)

केवल कुछ प्रविष्टियाँ दिखा रहा है:

+-----------+-----+-----------+----------+-------+-------+--------------------------+
|customer_id|dummy|report_date|date      |value_1|value_2|dummy_string              |
+-----------+-----+-----------+----------+-------+-------+--------------------------+
|1          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|2          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|3          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|4          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|5          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|6          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|7          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
...

फिर - अच्छे उपाय के लिए कुछ अतिरिक्त छँटाई सहित - यह आवश्यक नहीं है, सभी प्रारूपों के साथ काम करना:

df.repartitionByRange(4, $"customer_id")
  .sortWithinPartitions("customer_id", "date", "value_1")
  .write
  .parquet("/tmp/SOQ6")

इसने 4 फाइलें दीं जैसा कि नीचे दी गई तस्वीर में है:

enter image description here

आप 4 फाइलें देख सकते हैं और नामकरण के पहले और आखिरी हिस्से स्पष्ट हैं। दौड़ना:

val lines = spark.read.parquet("/tmp/SOQ6/part-00000-tid-2518447510905190948-a81455f6-6c0b-4e02-89b0-57dfddf1fb97-1200-c000.snappy.parquet")
val words = lines.collect
lines.count

5 रिकॉर्ड और सामग्री को प्रकट करता है जिसे डेटाफ़्रेम के अनुसार लगातार क्रमित किया जाता है।

lines: org.apache.spark.sql.DataFrame = [customer_id: int, dummy: int ... 5 more fields]
 words: Array[org.apache.spark.sql.Row] = Array([1,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [2,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [3,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [4,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [5,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX])
res11: Long = 5

इसे सभी फाइलों पर चलाएं, लेकिन केवल एक दिखाएं।

अंतिम टिप्पणियां

क्या यह एक अच्छा विचार है यह एक अलग कहानी है, उदा। गैर-प्रसारित जॉइन के बारे में सोचें जो एक मुद्दा है।

इसके अलावा, मैं स्पष्ट रूप से 4 को हार्ड-कोड नहीं करूंगा, लेकिन N के लिए कुछ फॉर्मूला लागू करने के लिए विभाजन पर लागू किया जाएगा! उदा.:

val N = some calculation based on counts in DF and your cluster 
val df2 = df.repartition(N, $"c1", $"c2")

आपको डीएफ राइटर का परीक्षण करना होगा क्योंकि दस्तावेज पूरी तरह से स्पष्ट नहीं है।

EMR क्लस्टर पर 2M रिकॉर्ड, 4 फाइलों के साथ-साथ आउटपुट के संदर्भ में जाँच की गई।

1
thebluephantom 14 फरवरी 2019, 10:08