5. JMSで画像変換を非同期処理

前章で学んだJMSを使って、画像データをメッセージに詰めて送信し、画像変換処理をMessageListenerで非同期処理するようにしましょう。

まずは画像処理用のキューを追加するためにapplication.ymlを以下のように修正します。

spring:
  thymeleaf.cache: false
  main.show-banner: false
  hornetq:
    mode: embedded
    embedded:
      enabled: true
      queues: hello,faceConverter # 宛先名

次に送信処理を作成しましょう。

JmsMessagingTemplateはデフォルトでは

  • String
  • byte[]
  • Map
  • Serializable

の変換に対応しています。今回は非効率的ですが、送られたjavax.servlet.http.Partから取得した画像データをbyte配列に変換し、 MessageListener側でbyte配列からBufferedImageに変換し、OpenCVに渡します。

まずはControllerにリクエスト受付処理を追加しましょう。

package kanjava;

// ...
import org.springframework.util.StreamUtils;
// ...

@SpringBootApplication
@RestController
public class App {
    // ...

    @RequestMapping(value = "/queue", method = RequestMethod.POST)
    String queue(@RequestParam Part file) throws IOException {
        byte[] src = StreamUtils.copyToByteArray(file.getInputStream()); // InputStream -> byte[]
        Message<byte[]> message = MessageBuilder.withPayload(src).build(); // byte[]を持つMessageを作成
        jmsMessagingTemplate.send("faceConverter", message); // convertAndSend("faceConverter", src)でも可
        return "OK";
    }

    // ...
}

次に、宛先faceConverterに対するMessageListenerを追加しましょう。

@SpringBootApplication
@RestController
public class App {
    // ...

    @RequestMapping(value = "/queue", method = RequestMethod.POST)
    String queue(@RequestParam Part file) throws IOException {
        byte[] src = StreamUtils.copyToByteArray(file.getInputStream());
        Message<byte[]> message = MessageBuilder.withPayload(src).build();
        jmsMessagingTemplate.send("faceConverter", message);
        return "OK";
    }

    // ...

    @JmsListener(destination = "faceConverter", concurrency = "1-5")
    void convertFace(Message<byte[]> message) throws IOException {
        log.info("received! {}", message);
        try (InputStream stream = new ByteArrayInputStream(message.getPayload())) { // byte[] -> InputStream
            Mat source = Mat.createFrom(ImageIO.read(stream)); // InputStream -> BufferedImage -> Mat
            faceDetector.detectFaces(source, FaceTranslator::duker);
            BufferedImage image = source.getBufferedImage();
            // do nothing...
        }
    }
}

ここまでの内容を組み合わせれば、内容を理解できると思います。

$ curl -F 'file=@hoge.jpg' localhost:8080/queue
OK

サーバーログは以下のようになります。

2015-03-01 00:19:22.366  INFO 14014 --- [enerContainer-1] kanjava.App                              : received! GenericMessage [payload=byte[52075], headers={jms_redelivered=false, jms_deliveryMode=2, JMSXDeliveryCount=1, jms_destination=HornetQQueue[faceConverter], jms_priority=4, id=ba27919f-8758-58fc-9976-99262605295c, jms_timestamp=1425136762365, jms_expiration=0, jms_messageId=ID:2c46ab2c-bf5d-11e4-850e-eff6d41dec3e, timestamp=1425136762366}]
2015-03-01 00:19:22.512  INFO 14014 --- [enerContainer-1] kanjava.FaceDetector                     : 1 faces are detected!

この処理では結果がわかりませんね。

次に50リクエストを同時に送ってみましょう。

$ for i in `seq 1 50`;do curl -F 'file=@hoge.jpg' localhost:8080/queue; done
OKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOKOK

全てレスポンスは返ってきています。サーバーログはどうでしょうか。

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  [thread 23815 also had an error]
#
# JRE version: Java(TM) SE Runtime Environment (8.0_20-b26) (build 1.8.0_20-b26)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.20-b23 mixed mode bsd-amd64 compressed oops)
# Problematic frame:
# C  [libopencv_objdetect.2.4.dylib+0xe307]  cv::HaarEvaluator::operator()(int) const+0x23
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /xxxx/hs_err_pid14014.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.sun.com/bugreport/crash.jsp
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#

JVMがハングしています・・・。

実は、顔変換サービスの作成の段階でバグがありました。複数リクエストを同時に捌く際に起きているバグなので、 スレッドアンセーフによるバグですね。どこでしょうか。

JVMが落ちていることと、cv::HaarEvaluator::operator()(int)がヒントです。OpenCVの顔検出部分が怪しいです。

以下のハイライト部分がスレッドアンセーフです。

@JmsListener(destination = "faceConverter", concurrency = "1-5")
void convertFace(Message<byte[]> message) throws IOException {
    log.info("received! {}", message);
    try (InputStream stream = new ByteArrayInputStream(message.getPayload())) {
        Mat source = Mat.createFrom(ImageIO.read(stream));
        faceDetector.detectFaces(source, FaceTranslator::duker); // この中の処理がスレッドアンセーフ!
        BufferedImage image = source.getBufferedImage();
        // do nothing...
    }
}

正確にはclassifier.detectMultiScale(source, faceDetections);の部分です。

classifierがステートフルなため、FaceDetectorをデフォルトのsingletonスコープで登録しているのが問題なようです。

都度インスタンスを作り直す、prototypeスコープに変更しましょう。

以下のように、コンポーネントスキャン対象のクラスに@Scopeアノテーションをつけてスコープを明示します。

@Component
@Scope(value = "prototype")
class FaceDetector {
    // ...
}

実はこれだけでは、期待通りには動きません。Springではインスタンスのライフサイクルは寿命の長い方に合わせられます。

すなわちsingletonスコープのAppコントローラーに対して、prototypeスコープのFaceDetectorをインジェクションしても、 faceDetectorフィールドは寿命の長いsingletonスコープとして振る舞います。

この関係を変える(faceDetectorフィールドをprototypeスコープとして振る舞わせる)ために、scoped-proxyという仕組みを導入します。

@ScopeproxyMode属性に以下のような設定を行います。

@Component
@Scope(value = "prototype", proxyMode = ScopedProxyMode.TARGET_CLASS)
class FaceDetector {
    // ...
}

これでFaceDetectorがProxyでラップされた状態でAppにインジェクションされるため、Appのスコープによらず、 faceDetectorフィールドはprototypeスコープでいられます。

この状態でAppクラスを再起動し、再度50リクエストを送ってみてください。FaceDetectorが毎回初期化され、無事全てのリクエストが捌かれているのがわかると思います。

注釈

FaceDetectorの初期化コストも大きいので、singletonスコープのままsynchronizedによる同期化を行っても良いです。 どちらの性能が良いかは、サーバースペックと同時リクエスト数次第です。

本章では画像処理を非同期に実行しました。またインスタンスのスコープについて学びました。以上で本章は終了です。

本章の内容を修了したらハッシュタグ「#kanjava_sbc #sbc05」をつけてツイートしてください。

次は非同期に実行した処理結果を通知するために、STOMPという別のメッセージングプロトコルを使用します。 次章ではまずはSTOMPをつかってみましょう。