Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)

I've app that procude&consume message with kafka and protocol buffer and everything works great. I'm serialize the protocol buffer with SerializeAsString() (this app was written in c++).

Now, I've added new node.js website that also consume messages and try to decode them.

My js code (using the great ProtoBuf.js module):

var builder = ProtoBuf.loadProtoFile("/home/aii/general/proto/All.proto"),
    protobuf = builder.build("protobuf"),
    Trace = protobuf.Trace,
    MessageType = protobuf.MessageType,
    MessageTypeAck = protobuf.MessageTypeAck,
    MessageTypeKeepAlive = protobuf.MessageTypeKeepAlive;

function getMessageType(val) {
  return Object.keys(MessageType).filter(function(key) {return MessageType[key] === val})[0]
}

consumer.on('message', function (message) {
    try{
      switch(getMessageType(message.key[0])) {
        case 'MESSAGE_TYPE_ACK':
          console.log(MessageTypeAck.decode(message.value));
          break;
        case 'MESSAGE_TYPE_KEEP_ALIVE':
          console.log(MessageTypeKeepAlive.decode(message.value));
          break;
        default:
          console.log("Unknown message type");
      }
    } catch (e){
      if (e.decoded) {
        var err = e.decoded;
        console.log(err);
      }
      else {
        console.log(e);
      }
    }
});

Result:

[Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)]

My proto files:

Trace.proto:

package protobuf;

message Trace {
    optional string topic = 1;
    optional int32 partition = 2;
    optional int64 offset = 3;
}

MessageType.proto

package protobuf;

enum MessageType {
    MESSAGE_TYPE_ACK = 1;
    MESSAGE_TYPE_KEEP_ALIVE = 2;
}

Messages.proto:

import "Trace.proto";

package protobuf;

message MessageTypeAck {
    repeated Trace trace = 1;

    optional string sourceModuleName = 2;
    optional int32  sourceModuleID   = 3;
}

message MessageTypeKeepAlive {
    repeated Trace trace = 1;

    optional string sourceModuleName = 2;
    optional int32  sourceModuleID   = 3;
}

All.proto

import "Trace.proto"
import "MessageType.proto";
import "Messages.proto"

What am I doing wrong? (decode?)

Answers:

Answer

so, Thanks to this SO question&answer, I figured it out! The problem is related to the way I've consume the buffer (by kafka) - as utf-8 (default). It actually related to code which I didn't attached:

var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client('localhost:2181'),
    consumer = new Consumer(
        client,
        [
            { topic: 'Genesis', partition: 0 }
        ],
        {
            autoCommit: false,
            encoding: 'buffer'
        }
    ); 

and the solution was to add the encoding: 'buffer' line (the default is 'utf-8' as mentioned here).

Tags

Recent Questions

Top Questions

Home Tags Terms of Service Privacy Policy DMCA Contact Us

©2020 All rights reserved.