当前位置: 动力学知识库 > 问答 > 编程问答 >

python - Celery task in Flask for uploading and resizing images and storing it to Amazon S3

问题描述:

I'm trying to create a celery task for uploading and resizing an image before storing it to Amazon S3. But it doesn't work as expected. Without the task everything is working fine. This is the code so far:

stacktrace

Traceback (most recent call last):

File "../myVE/lib/python2.7/site-packages/kombu/messaging.py", line 579, in _receive_callback

decoded = None if on_m else message.decode()

File "../myVE/lib/python2.7/site-packages/kombu/transport/base.py", line 147, in decode

self.content_encoding, accept=self.accept)

File "../myVE/lib/python2.7/site-packages/kombu/serialization.py", line 187, in decode

return decode(data)

File "../myVE/lib/python2.7/site-packages/kombu/serialization.py", line 74, in pickle_loads

return load(BytesIO(s))

File "../myVE/lib/python2.7/site-packages/werkzeug/datastructures.py", line 2595, in __getattr__

return getattr(self.stream, name)

File "../myVE/lib/python2.7/site-packages/werkzeug/datastructures.py", line 2595, in __getattr__

return getattr(self.stream, name)

...

RuntimeError: maximum recursion depth exceeded while calling a Python object

views.py

from PIL import Image

from flask import Blueprint, redirect, render_template, request, url_for

from myapplication.forms import UploadForm

from myapplication.tasks import upload_task

main = Blueprint('main', __name__)

@main.route('/upload', methods=['GET', 'POST'])

def upload():

form = UploadForm()

if form.validate_on_submit():

upload_task.delay(form.title.data, form.description.data,

Image.open(request.files['image']))

return redirect(url_for('main.index'))

return render_template('upload.html', form=form)

tasks.py

from StringIO import StringIO

from flask import current_app

from myapplication.extensions import celery, db

from myapplication.helpers import resize, s3_upload

from myapplication.models import MyObject

@celery.task(name='tasks.upload_task')

def upload_task(title, description, source):

stream = StringIO()

target = resize(source, current_app.config['SIZE'])

target.save(stream, 'JPEG', quality=95)

stream.seek(0)

obj = MyObject(title=title, description=description, url=s3_upload(stream))

db.session.add(obj)

db.session.commit()

Thank you

网友答案:

It looks like you are attempting to pass the entire uploaded file as part of the Celery message. I imagine that is causing you some trouble. I would recommend seeing if you can save the file to the web server as part of the view, then have the message (the "delay" argument) contain the filename rather than entire file's data. The task can then read the file in from the hard drive, upload to s3, then delete it locally.

网友答案:

I know this is a very old question, but I was struggling with passing the file's contents to the celery task. I would keep getting errors trying to follow what others have done. So I wrote this up, hoping it may help others in the future.

TL;DR

  • Send the file contents to the celery task with base64 encoding
  • Decode the data in the celery task and use io.BytesIO for the stream

Long answer

I was not interested in saving the image to disk and reading it again, so I wanted to pass the needed data to reconstruct the file in the background.

Trying to follow what others suggest, I kept getting encoding errors. Some of the errors were:

  • UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte
  • TypeError: initial_value must be str or None, not bytes

The TypeError was thrown by the io.StringIO. Trying to decode the data to get rid of the UnicodeDecodeError did not make much sense. As the data is binary in the first place, I tried to use a io.BytesIO instance, and that worked perfectly. The only thing I needed to do was to encode the file's stream with base64 and then I would be able to pass the content to the celery task.

Code samples

images.py

import base64

file_.stream.seek(0) # start from beginning of file
# some of the data may not be defined
data = {
  'stream': base64.b64encode(file_.read()),
  'name': file_.name,
  'filename': file_.filename,
  'content_type': file_.content_type,
  'content_length': file_.content_length,
  'headers': {header[0]: header[1] for header in file_.headers}
}

###
# add logic to sanitize required fields
###

# define the params for the upload (here I am using AWS S3)
bucket, s3_image_path = AWS_S3_BUCKET, AWS_S3_IMAGE_PATH
# import and call the background task
from async_tasks import upload_async_photo 
upload_async_photo.delay(
  data=data,
  image_path=s3_image_path,
  bucket=bucket)

async_tasks

import base64, io
from werkzeug.datastructures import FileStorage

@celery.task
def upload_async_photo(data, image_path, bucket):
    bucket = get_s3_bucket(bucket) # get bucket instance
    try:
        # decode the stream
        data['stream'] = base64.b64decode(data['stream'])
        # create a BytesIO instance
        # https://docs.python.org/3/library/io.html#binary-i-o
        data['stream'] = io.BytesIO(data['stream'])
        # create the file structure
        file_ = FileStorage(**data)
        # upload image
        bucket.put_object(
                Body=file_,
                Key=image_path,
                ContentType=data['content_type'])
    except Exception as e:
        print(str(e))

Edit

I also changed what content celery accepts and how it serializes data. To avoid having trouble passing the Bytes instance to the celery task, I had to add the following to my config:

CELERY_ACCEPT_CONTENT = ['pickle']
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
网友答案:

Old question, but I have just had the same problem. Accepted answer did not work for me (I'm using Docker instances so Celery does not have access to producers filesystem. Also, its slow to first save file to local filesystem).

My solution keeps the file in the RAM. It's therefore much faster. Only downside is if you need to handle large files (>1GB), then you need a server with a lot of RAM.

The doc_file is of type werkzeug.datastructure.FileStorage (see docs here)

Sending the file to celery worker:

entry.delay(doc_file.read(), doc_file.filename, doc_file.name, doc_file.content_length, doc_file.content_type, doc_file.headers)

Receiving the file:

from werkzeug.datastructures import FileStorage
from StringIO import StringIO

@celery.task()
def entry(stream, filename, name, content_length, content_type, headers):
    doc = FileStorage(stream=StringIO(stream), filename=filename, name=name, content_type=content_type, content_length=content_length)
    # Do something with the file (e.g save to Amazon S3)
分享给朋友:
您可能感兴趣的文章:
随机阅读: