I've app that procude&consume message with kafka and protocol buffer and everything works great. I'm serialize the protocol buffer with SerializeAsString()
(this app was written in c++).
Now, I've added new node.js website that also consume messages and try to decode them.
My js code (using the great ProtoBuf.js module):
var builder = ProtoBuf.loadProtoFile("/home/aii/general/proto/All.proto"),
protobuf = builder.build("protobuf"),
Trace = protobuf.Trace,
MessageType = protobuf.MessageType,
MessageTypeAck = protobuf.MessageTypeAck,
MessageTypeKeepAlive = protobuf.MessageTypeKeepAlive;
function getMessageType(val) {
return Object.keys(MessageType).filter(function(key) {return MessageType[key] === val})[0]
}
consumer.on('message', function (message) {
try{
switch(getMessageType(message.key[0])) {
case 'MESSAGE_TYPE_ACK':
console.log(MessageTypeAck.decode(message.value));
break;
case 'MESSAGE_TYPE_KEEP_ALIVE':
console.log(MessageTypeKeepAlive.decode(message.value));
break;
default:
console.log("Unknown message type");
}
} catch (e){
if (e.decoded) {
var err = e.decoded;
console.log(err);
}
else {
console.log(e);
}
}
});
Result:
[Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)]
My proto files:
Trace.proto:
package protobuf;
message Trace {
optional string topic = 1;
optional int32 partition = 2;
optional int64 offset = 3;
}
MessageType.proto
package protobuf;
enum MessageType {
MESSAGE_TYPE_ACK = 1;
MESSAGE_TYPE_KEEP_ALIVE = 2;
}
Messages.proto:
import "Trace.proto";
package protobuf;
message MessageTypeAck {
repeated Trace trace = 1;
optional string sourceModuleName = 2;
optional int32 sourceModuleID = 3;
}
message MessageTypeKeepAlive {
repeated Trace trace = 1;
optional string sourceModuleName = 2;
optional int32 sourceModuleID = 3;
}
All.proto
import "Trace.proto"
import "MessageType.proto";
import "Messages.proto"
What am I doing wrong? (decode?)
so, Thanks to this SO question&answer, I figured it out! The problem is related to the way I've consume the buffer (by kafka) - as utf-8 (default). It actually related to code which I didn't attached:
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client('localhost:2181'),
consumer = new Consumer(
client,
[
{ topic: 'Genesis', partition: 0 }
],
{
autoCommit: false,
encoding: 'buffer'
}
);
and the solution was to add the encoding: 'buffer' line (the default is 'utf-8' as mentioned here).
©2020 All rights reserved.