当前位置: 动力学知识库 > 问答 > 编程问答 >

how to let spark 2.0 reading mutli folders parquet like csv

问题描述:

I have some daily data to save to multi folders(mostly based on time). now I have two format to store the files one is parquet and the other is csv , I would like to save to parquet format to save some space.

the folder structure is like following :

[[email protected] raw]# tree

.

├── entityid=10001

│ └── year=2017

│ └── quarter=1

│ └── month=1

│ ├── day=6

│ │ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet

│ └── day=7

│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet

├── entityid=100055

│ └── year=2017

│ └── quarter=1

│ └── month=1

│ ├── day=6

│ │ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet

│ └── day=7

│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet

├── entityid=100082

│ └── year=2017

│ └── quarter=1

│ └── month=1

│ ├── day=6

│ │ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet

│ └── day=7

│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet

└── entityid=10012

└── year=2017

└── quarter=1

└── month=1

├── day=6

│ └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet

└── day=7

└── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet

now I have a python list stores all the folders need to be read,suppose each time run it need to read only some of the folders base on filter conditions.

folderList=df_inc.collect()

folderString=[]

for x in folderList:

folderString.append(x.folders)

In [44]: folderString

Out[44]:

[u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=7',

u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=6',

u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=7',

u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=6',

u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=6',

u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=7']

the files were writen by :

df_join_with_time.coalesce(1).write.partitionBy("entityid","year","quarter","month","day").mode("append").parquet(rawFolderPrefix)

when I try to read the folders stored in folderString by df_batch=spark.read.parquet(folderString) error java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String encounters.

if I save the files in csv format and read it through below code it just works fine as following: please if anyway to read the filelist for parquet folder ,much appreciate!

In [46]: folderList=df_inc.collect()

...: folderString=[]

...:

...: for x in folderList:

...: folderString.append(x.folders)

...: df_batch=spark.read.csv(folderString)

...:

In [47]: df_batch.show()

+------------+---+-------------------+----------+----------+

| _c0|_c1| _c2| _c3| _c4|

+------------+---+-------------------+----------+----------+

|6C25B9C3DD54| 1|2017-01-07 00:00:01|1483718401|1483718400|

|38BC1ADB0164| 3|2017-01-06 00:00:01|1483632001|1483632000|

|38BC1ADB0164| 3|2017-01-07 00:00:01|1483718401|1483718400|

网友答案:

You are facing a miss understanding of partition in Hadoop and Parquet.

See, I have a simple file structure partitioned by year-month. It is like this:

my_folder
.
├── year-month=2016-12
|   └── my_files.parquet
├── year-month=2016-11
|   └── my_files.parquet

If I make a read from my_folder without any filter in my dataframe reader like this:

df = saprk.read.parquet("path/to/my_folder")
df.show()

If you check the Spark DAG visualization you can see that in this case it will read all my partitions as you said:

In the case above, each point in the first square is one partition of my data.

But if I change my code to this:

df = saprk.read.parquet("path/to/my_folder")\
          .filter((col('year-month') >= lit(my_date.strftime('%Y-%m'))) &
                  (col('year-month') <= lit(my_date.strftime('%Y-%m'))))

The DAG visualization will show how many partitions I'm using:

So, if you filter by the column that is the partition you will not read all the files. Just that you need, you don't need to use that solution of reading one folder by folder.

网友答案:

I got this solved by :

df=spark.read.parquet(folderString[0])
y=0
for x in folderString:
    if y>0:
        df=df.union(spark.read.parquet(x))
    y=y+1

it's a very ugly solution ,if you have good idea ,please let me know. many thanks.

few days later,found the perfect way to solve the problem by:

df=spark.read.parquet(*folderString)
分享给朋友:
您可能感兴趣的文章:
随机阅读: