[Amity Chat SDK] My subscribe callback on getMessages(Channel_ID) is not called each time I get new messages in chat room

Hello! I am facing a problem while receiving new messages in my chat room.
I have called messagesRepository.getMessages() and created .subscribe(
amityMessages->{
//callback
})

But the only thing I am getting is the output:
I/SocketEventListener: onEvent(main thread: false): v3.message.didCreate
I/com.ekoapp.ekosdk.internal.api.mapper.EkoObjectPersister: object changed: id: 613b5ea4387e8e29ba683835 changedObject: EkoInternalMessage

My callback isn’t called.

Can anyone help? Thanks in advance

Hello Marti :smiley:
It seems like your subscription got unsubscribed somehow. Could you please give me the full code snippet here please?

1 Like

This is my code, everything is now in onCreate() of MainActivity.
I am trying to test basic functionalities.
Capture

New Messages I am sending for translation, and after that to Adapter to show it in recyclerView.

And here is the option to enter and send message
Capture2
Here is all code that is using Chat SDK.

It seems like there’s an issue with threading. You may query those messages on the wrong thread, also updating UI states on the wrong thread as well.
Could you please try to specific subscribing thread and observing thread following by this code snippet :

 AmityChatClient.newMessageRepository()
            .getMessages(MY_CHANNEL_ID)
            .build()
            .query()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
              //update your ui states
            }

Does your Log.e("mainActivity", "new message") show up in logcat?

Only for first couple of messages yes and runs the subscribe callback, but after some number of messages not anymore, just this I sent you :frowning:

Okay, so the data pipeline is connected properly but when you sent a new message, the pipeline seems to stop responding correct?

I am receiving for example 15 messages and everything is ok. I send messages and they are shown on UI, but at some point it stops working everything, sending and receiving, both.

Alright, seems like the pipeline got disconnected somehow… may you add

 AmityChatClient.newMessageRepository()
            .getMessages(MY_CHANNEL_ID)
            .build()
            .query()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(//log to see what could be the error)
            .doOnDispose(//log to see if the pipeline is disposed some how)
            .subscribe {
              //update your ui states
            }

for us to better comprehend the situation.

Should I change query for sending message? Or it is ok?

probably good to add them to the message creation pipeline too. In order to verify that the message got sent successfully as well

What should I change?

doOnError(log)
doOnComplete(log)

in the message creation pipeline

doOnComplete is done every time successfully when message is sent, but nothing is happening on receiving messages, still the same, as earlier.
Btw onDispose is not available for getMessages().

@Trust is it normal that my subscribe callback is called for example 5 times after I send one message to chat room ?

Hello @marti :slight_smile:

You need to make sure that the pipeline is disposed once it’s unused.
Or else you could reduce the number of changes by using Rx functions such as throttleLatest() or throttleFirst() in the data pipeline

messageRepository.getMessages(YOUR_CHANNEL_ID)
            .build()
            .query()
            .throttleLatest(1, TimeUnit.SECONDS, true)
            .subscribe {  }

Hello @Trust, thanks for answering.
What does that Rx function does?

Hi Marti, the rx operation

throttleLatest(time: Int, timeUnit: TimeUnit, emitLast: Boolean)

Throttles items from the upstream Observable by first emitting the next item from upstream, then periodically emitting the latest item (if any) when the specified timeout elapses between them.

@Trust this seems to work now, my subscribe callback is called and is receiving all messages that are been sent from other user each second. I will try to send more messages in one second to see if it will unsubscribe again (somehow)

@Trust can you tell me, if I have forgotten to do something that is mandatory.

  1. I created class that extends Application and in onCreate() called setup(API_KEY)

@Override
public void onCreate() {
super.onCreate();
AmityCoreClient.INSTANCE.setup(API_KEY);
}

  1. In my AppCompatActivity in onCreate() have registered device and in onDestroy() unregistered.
AmityCoreClient.INSTANCE.registerDevice(TEST_USER_ID)
            .displayName("Test user name")
            .build()
            .submit();

@Override
protected void onDestroy() {
super.onDestroy();
AmityCoreClient.INSTANCE.unregisterDevice();
}

  1. In the same onCreate() created channel, called getMessages like you told me with specifying

channelRepository = AmityChatClient.INSTANCE.newChannelRepository();
channelRepository.createChannel()
.communityType()
.withChannelId(MY_CHANNEL_ID)
.displayName(“Test channel”) // optional
.build()
.create()
.subscribe();

AmityChatClient.INSTANCE.newMessageRepository()
.getMessages(MY_CHANNEL_ID)
.build()
.query()
.throttleLatest(1, TimeUnit.SECONDS, true)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(throwable → {Log.e(“getMessages”,throwable.getMessage());})
.doOnCancel(() → {Log.e(“getMessages”,“doOnCancel”);})
.subscribe(amityMessages → {
if (amityMessages != null && amityMessages.size() > 0) {
Log.e(“SUBSCRIBE-METHOD”,"subscribe callback!!! new messages size: "
+ amityMessages.size() + “\n” +amityMessages.toString());
String myLastId = lastMessageId;
lastMessageId = amityMessages.get(amityMessages.size() - 1).getMessageId();
List newMessages = findNewMessages(myLastId, amityMessages);
if (newMessages != null){
Log.i(“subscribe”, “” + newMessages.size());
messagesAdapter.addAmityMessagesToList(newMessages);
messagesAdapter.notifyDataSetChanged();
}
}
});

  1. When creating message:

messageRepository.createMessage(MY_CHANNEL_ID)
.with()
.text(message_text)
.build()
.send()
.doOnError(throwable → {Log.e(“createMessage”,throwable.getMessage());})
.doOnDispose(() → {Log.e(“createMessage”,“doOnDispose”);})
.doOnComplete(() → {Log.i(“createMessage”,“message sent”);})
.subscribe();