Category Archives: JMS

Parser logów w Groovy

Podczas debugowania wadliwego działania redeliveringu w ActiveMQ ostateczną metodą okazało się ‘echo dupa’ 😉 Jednakże przy próbie jednoczesnego obserwowania działania aplikacji przy 10 wiadomościach wpadających do kolejki okazało się, że metoda kartki i papieru zabiera trochę za dużo czasu. Zatem zaprzęgłem do pracy Grooviego i postanowiłem na szybko napisać mały parserek plików z logami.

Główne pomysły i kod podkradłem z tej strony. Jednakże ja do logów zapisywałem informację o tym, która to próba ponownego doręczenia się wykonuje, ilość łącznych, informację o wątku, który to wykonuje oraz informacji z samej wiadomości w kolejce JMS.  To oczywiście było dopisywane do wiersza ze wskazaniem informacji o wątku i daty. Całość wyglądała mniej więcej tak:

[#|2012-02-17T11:16:02.419+0100|INFO|sun-appserver9.1|javax.enterprise.system.stream.out|_ThreadID=80;_ThreadName=testowaKolejka-1;|CURRENT:4 MAX:4 id:identyfikator1-1-1-1|#]

I z tego narodził się poniższy kod. Być może komuś się przyda w przyszłości.

/* Mozna uzyc podejscia, ktore odczytuje plik linia po linii. Jednakze
 istnieje szansa, ze 1 wpis do loga zajmie wiecej niz domyslna 1 linie.
 Dlatego tez uzywamy podejscia, ktore wyciagnie plik do 1 lancucha tekstowego
 nastepnie zas kolejne przyporzadkowania do wzorca wyrazen regularnych.

 !!!UWAGA!!! - rozwiazanie moze byc malo wydajne przy duzych plikach

  */

def logLineStart = /^\[\#\|\d{4}-\d{2}-\d{2}/   // Wpis w logu o wyjatku
def log = new File('C:\\log.log').text
def informationEntries = []
def presentationData = [:]

def splitter = log =~ """(?xms)
    (    ${logLineStart}   .*?)
    (?=  ${logLineStart} | \\Z)
"""

splitter.each { matched, entry ->

    // Replace jest po to by otrzymac jeden lancuch tekstowy w przypadku dopasowan ze znakiem
    // nowej linii. Wowczas dane nie bylyby poprawne.
    if (entry =~ /CURRENT:/) {
         informationEntries.add( entry.replaceAll("[\r\n]","") )
    }

}

def redeliveryAttemptData = []

informationEntries.each {
    // Wpisy na liscie wygladaja mniej wiecej tak
    // [#|2012-02-17T15:49:50.525+0100|INFO|sun-appserver9.1|javax.enterprise.system.stream.out|_ThreadID=134;_ThreadName=testowaKolejka-1;|CURRENT:4 MAX:4 id:identyfikator1-1-1-1|#]

    // Zatem po prostu rozbijamy ten string i wyciagamy dane z listy
    redeliveryAttemptData = it.tokenize('|')

    String currentEntryData = redeliveryAttemptData.get(1)
    String currentEntryThread = redeliveryAttemptData.get(5)
    String currentEntryInfo = redeliveryAttemptData.get(6)

    // Wpierw potrzebna nam data i godzina z dokladnoscia do sekund
    def dataRegexp = /\d{2}:\d{2}:\d{2}\.\d{3}/
    def currentMinutesSec =  currentEntryData.find(dataRegexp)

    // Informacja o numerze watku, ktory procesowal wiadomosc
    def threadRegexp = /=jmsContainerEmailIn-\d{1,}/
    def currentThreadInfo =  currentEntryThread.find(threadRegexp).tokenize('-').get(1)

    // Logowane przeze mnie informacje o wiadomosci i metadane redeliveringu
    def listWithCurrentInfo = currentEntryInfo.tokenize(' ')
    def currentRound = listWithCurrentInfo.get(0).tokenize(':').get(1)
    def currentId = listWithCurrentInfo.get(2).tokenize(':').get(1)

    if( !presentationData[ currentId ] ) {
        presentationData[ currentId ] = [:]
    }

    presentationData[ currentId ][ 'id' ] = currentId
    presentationData[ currentId ][ currentRound ] = currentMinutesSec + '( ' + currentThreadInfo + ' )'

}

// Drukujemy na konsole w formie CSV. Mozna inaczej - stad
// zreszta zapisywanie danych do mapy
presentationData.each { key, value ->

    StringBuffer sb = new StringBuffer(64)

    sb.append( value.get('id')).append(',')

    for ( i in ['0','1','2','3','4'] ) {
        sb.append( value.get( i ) ).append(',')
    }

    println sb

}

Klaster ActiveMQ i dlaczego kolejność XMLa ma znaczenie

Jednym z elementów specyfikacji korporacyjnej Javy są Message Driven Beans, które zasadniczo wrzuca się do wora z napisem JMS (Java Message System). Jak zawsze pominę dywagacje teoretyczne co jest czym. Kiedy wezmę się za zdawanie certyfikatu biznesowego Oracla to na pewno sporo na ten temat napiszę. Dzisiaj jednak coś prostego i na szybko – zrobimy klaster brokerów ActiveMQ.

ActiveMQ jest produktem ze stajni Apache, który dostarcza funkcjonalności JMS, ale poza kontenerem JEE. Ma to swoje plusy – do obsługi JMS można oddelegować oddzielną maszynę i nie przejmować się padami serwera. Trochę łatwiej skalować prostą aplikacyjkę niż kolejne serwery aplikacyjne. W przypadku ActiveMQ nie jest również problemem stworzenie klastra brokerów, co umożliwia ciągłość działania w przypadku gdyby jeden z brokerów odmówił posłuszeństwa.

Samo ActiveMQ da się uruchomić z domyślnymi (na systemach *nixowych wystarczy nadać prawa do wykonywania plikowi activemq oraz zapisu do folderów z danymi i konfiguracją) ustawieniami i będzie to działało całkiem sprawnie. Każdy z brokerów posiada swoją nazwę, a także maszynę i port, na którym nasłuchuje. Domyślny plik konfiguracyjny wygląda nastepująco (wersja 5.5.1):

<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements. See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License. You may obtain a copy of the License at

 http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
 xmlns="http://www.springframework.org/schema/beans"
 xmlns:amq="http://activemq.apache.org/schema/core"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
 http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

 <!-- Allows us to use system properties as variables in this configuration file -->
 <bean>
 <property name="locations">
 <value>file:${activemq.base}/conf/credentials.properties</value>
 </property>
 </bean>

 <!--
 The <broker> element is used to configure the ActiveMQ broker.
 -->
 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true">

 <!--
 For better performances use VM cursor and small memory limit.
 For more information, see:

 http://activemq.apache.org/message-cursors.html

 Also, if your producer is "hanging", it's probably due to producer flow control.
 For more information, see:
 http://activemq.apache.org/producer-flow-control.html
 -->

 <destinationPolicy>
 <policyMap>
 <policyEntries>
 <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
 <pendingSubscriberPolicy>
 <vmCursor />
 </pendingSubscriberPolicy>
 </policyEntry>
 <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
 <!-- Use VM cursor for better latency
 For more information, see:

 http://activemq.apache.org/message-cursors.html

 <pendingQueuePolicy>
 <vmQueueCursor/>
 </pendingQueuePolicy>
 -->
 </policyEntry>
 </policyEntries>
 </policyMap>
 </destinationPolicy>

 <!--
 The managementContext is used to configure how ActiveMQ is exposed in
 JMX. By default, ActiveMQ uses the MBean server that is started by
 the JVM. For more information, see:

 http://activemq.apache.org/jmx.html
 -->
 <managementContext>
 <managementContext createConnector="false"/>
 </managementContext>

 <!--
 Configure message persistence for the broker. The default persistence
 mechanism is the KahaDB store (identified by the kahaDB tag).
 For more information, see:

 http://activemq.apache.org/persistence.html
 -->
 <persistenceAdapter>
 <kahaDB directory="${activemq.base}/data/kahadb"/>
 </persistenceAdapter>

 <!--
 The systemUsage controls the maximum amount of space the broker will
 use before slowing down producers. For more information, see:

 http://activemq.apache.org/producer-flow-control.html

 <systemUsage>
 <systemUsage>
 <memoryUsage>
 <memoryUsage limit="20 mb"/>
 </memoryUsage>
 <storeUsage>
 <storeUsage limit="1 gb"/>
 </storeUsage>
 <tempUsage>
 <tempUsage limit="100 mb"/>
 </tempUsage>
 </systemUsage>
 </systemUsage>
 -->

 <!--
 The transport connectors expose ActiveMQ over a given protocol to
 clients and other brokers. For more information, see:

 http://activemq.apache.org/configuring-transports.html
 -->
 <transportConnectors>
 <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
 </transportConnectors>

 </broker>

 <!--
 Enable web consoles, REST and Ajax APIs and demos

 Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
 -->
 <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

By stworzyć klaster nie musimy się specjalnie wysilać. Wystarczy uruchomić 2 razy ActiveMQ. Można do tego użyć dokładnie tego samego kodu (lokalizacji na dysku), a użyć tylko innych plików startowych oraz konfiguracji (rzecz jasna każdy broker musi nasłuchiwać na oddzielnym porcie). U mnie rzecz była o tyle łatwiejsza, iż klaster miał być zbudowany z użyciem 2 oddzielnych maszyn. W związku z czym skopiowałem folder z całą aplikacją i konfiguracją na drugi serwer i jedynym co zmieniłem była nazwa brokera (parametr brokerName w elemencie broker).

Uruchomienie tym samym 2 oddzielnych węzłów nie było problemem – oba działały bez zastrzeżeń. Jednakże jak wspomniałem celem było stworzenie klastra brokerów. A tym samym obydwa węzły powinny widzieć siebie nawzajem.

Musimy wpierw odwiedzić też skrypty startujące AMQ i odnaleźć zmienną ACTIVEMQ_QUEUEMANAGERURL i nadać jej adres nie localhost, ale pełnej nazwy domeny – np. tcp://broker1.chlebik.pl:61616.

Potrzebna do klastra jest też zmiana elementu transportConnectors w konfiguracji brokera. U mnie wygląda on po zmianie tak:


<transportConnectors>
 <transportConnector name="openwire" uri="tcp://broker1.chlebik.pl:61616" updateClusterClients="true"
 rebalanceClusterClients="true" updateClusterClientsOnRemove="true" />
 </transportConnectors>

Uwaga! od razu ostrzegam, że przestawione domeny/porty są prezentacyjne i nie istnieją naprawdę. A jeśli już to na pewno nie są dostępne poprzez sieć 😉

Podobnie konfigurujemy (zmieniając dane) na drugim brokerze. Ich uruchomienie oddzielnie powinno się powieść bez problemu – po prostu dwie oddzielne instancje. By się zobaczyły wzajemnie musimy dodać do konfiguracji każdego z nich element o nazwie networkConnectors. W przypadku konfiguracji brokera słuchającego pod adresem tcp://broker1.chlebik.pl:61616 wyglądać będzie to tak:


<networkConnectors>
 <networkConnector uri="static:(tcp://broker2.chlebik.pl:61616)" conduitSubscriptions="false" />
</networkConnectors>

Oczywiście dla broker2 adres będzie odnosił się do broker1. I tutaj najważniejsza informacja – podana konfiguracja u mnie nie działała. Pomimo przeorania tutoriali w sieci oraz dokumentacji jakoś nikt nie wspomniał, iż plik konfiguracyjny ActiveMQ jest wrażliwy na kolejność elementów konfiguracyjnych!!! I by temat zadziałał element networkConnectors najbezpieczniej jest umieścić zaraz za elementem destinationPolicy. Dopiero wtedy przy odpaleniu drugiego brokera otrzymamy w logach coś na kształt:

2012-01-16 20:02:28,078 | INFO  | Establishing network connection from vm://broker1?async=false&network=true to tcp://broker2.chlebik.pl:61616 | org.apache.activemq.network.DiscoveryNetworkConnector | main

Lub coś podobnego do wyżej przedstawionego. Mi ten niuans zmarnował kilka godzin – mam nadzieję, że tym wpisem uchroniłem kogoś przed podobnym niebezpieczeństwem.