Wednesday, May 25, 2016

Kafka demo at JavaCro conference

In my previous post (published almost a year ago), I tried to summarize basic concepts of  distributed messaging system called Kafka, and to provide very basic tutorial for people who are new to this product.
At the end of that post I promised to give another "how to" once 0.9 version is released, version that introduces new consumers API. Version 0.9 is released six months ago, so I am little late with my promise, but better late then never :)

Few days ago JavaCro 2016 conference was held in Rovinj, Croatia, where I presented basic Kafka concepts to (mostly) people of croatian Java community and demonstrated simple spring Boot application using Kafka 0.9.  Since, I already explained basic Kafka concepts in previous post, this one will be focused on only on new consumer API and how to use it.



Starting Kafka


So lets start with setup.
Download Kafka platform 2.0.1 from http://packages.confluent.io/archive/2.0/confluent-2.0.1-2.11.7.zip and unzip it to location of your choice. Let's call this location confluent home folder.

Now we can start Kafka server, but as explained in my previous post, Kafka requires Apache ZooKeeper which is used for managing cluster configuration information.

Navigate to confluent home folder and run :
bin/zookeeper-server-start etc/kafka/zookeeper.properties

And after it's up Kafka  broker can be started (in new terminal):
bin/kafka-server-start etc/kafka/server.properties

Kafka is now started and ready to use. We started only one broker for testing purposes, but starting more brokers is trivial, only thing you need to prepare is another server.properties file since you want to configure different port then the one first broker is using.

Demo App


This demo app is spring boot web application which allows us to: 
  • create Kafka topics, 
  • start producers, 
  • control producers speed
  • start consumers
  • control consumer message processing speed
You can find source code at github repository
Core package contains most important classes: JavaCroProducer and JavaCroConsumer. These classes represent our kafka clients and use producer and consumer api to talk to kafka cluster.

Video of the demo presented.


In this video we can see how to add topics, producers and consumers, and how Kafka automatically re-balances consumer while they connect or disconnect. Gears are used for showing current speed of consumers and producers, and progress like bars are used to display topic partition and their current message count.

Producer


First, lets take a look at JavaCroProducer and see what we need to start it.
In order to communicate with Kafka (to send some messages to it) we need to create and configure KafkaProducer instance. I takes Properties object as argument so we must instantiate one and fill it with basic properties necessary for it to work.

        Properties config = new Properties();
        config.put("client.id", id);
        config.put("bootstrap.servers", "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put("acks", "1");

        KafkaProducer producer = new KafkaProducer(config);

- client.id property is used to uniquely identify our producer to Kafka cluster. In this case it is passed as constructor parameter of JavaCroProducer class. It's actually simple long incremented every time we create new JavaCroProducer instance.

- bootstrap.servers property is list of kafka broker identifiers consisting of host:port pairs, coma separated. We must provide at least one broker identifier, and if we have more then one broker in cluster it is recommended to provide two or more broker identifiers for redundancy. We don't have to provide all brokers identifiers, actually one is enough for producer to get all the info it needs, other identifiers are used just in case first broker fails or some other reason prevents producer to communicate with it.

- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG is telling KafkaProducer which serializer to use for message keys  when sending messages to Kafka. In this example built in String serializer is used.

- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG is telling KafkaProducer which serializer to use for message values  when sending messages to Kafka. In this example built in String serializer is used.

- acks property tells KafkaProducer when to consider a message successfully sent to Kafka broker. 0 value means that producer will consider message sent as soon as it puts it to socket buffer. Value 1 means thas message is considered sent when partition leader broker writes it to partition log, and finally value all means that producer will consider message sent only after partition leader and all it's replicas write message to partition log.

Now we have our producer ready and we can us it to  send messages. Something like this:

ProducerRecord record = new ProducerRecord("topicName", msgKey"", "msgValue");
producer.send(record);

We need ProducerRecord instance  to send the message and it's constructor takes three arguments:

- topic name : string, name of kafka topic to which you want to send the message.

- message key  : object,  it is used to decide on which partition will message go if the topic has more then one partition. If message key is not null, every message with same key will go to same partition, so if we want all messages  with same business property (for example sender id) to end up on same partition, we could use that property as message key.

- message value : string,  this is actual message payload - the content of the message.
Note that consumers will receive both message key and value so you can use key as some kind of extra message information holder.

There is overloaded constructor which takes partition number instead of message key argument, and it is used to explicitly set target partition of the topic. KafkaProducer provides methods for discovering how many partition specific topic has.

KafkaProducer.send() method is asynchronous but if we want to send the messages in synchronous way (not really recommended) we can do this, and here is how: send method returns Future so we wait until we get sending result by calling Future.get() method on it. 

If we need to examine the result of send method but without blocking sending process, there is a way:
overloaded version of send method receives second argument, implementation of org.apache.kafka.clients.producer.Callback interface.
It's method onCompletion(RecordMetadata metadata, Exception exception) will be called when message sending finishes, whether successfully or not. If message is sent successfully we can get meta data from RecordMetadata object (such as topic message was sent to, position of the message written to topic partition, number of partition which message was written to...) Second argument - Exception will be null if no error occurred, and not null otherwise.

Consumer


JavaCroConsumer represents our consumer process and it uses KafkaConsumer to communicate with Kafka cluster.
First thing to do is to create and configure KafkaConsumer object:

Properties config = new Properties();
config.put("client.id", id);
config.put("group.id", group);
        
config.put("bootstrap.servers", "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put("auto.offset.reset", "earliest");

consumer = new KafkaConsumer(config);
consumer.subscribe( Arrays.asList(new String[]{topic}));

Let's go through all KafkaConsumer config  properties :

- client.id : unique identifier of this consumer

group.id : group we want consumer to belong to.  Group is  set of consumers for handling specific business task (logical consumer). 

bootstrap.servers list of kafka broker identifiers consisting of host:port pairs, coma separated. Like with producers, here we also  must provide at least one broker identifier, and if we have more then one broker in cluster it is recommended to provide two or more broker identifiers for redundancy.

ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG - deserializer implementation class for deserializing message keys

ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - deserializer implementation class for deserializing message values

- auto.offset.reset : this property tells kafka from which position on partition we want new consumer to start reading messages in case no other consumer from the group already read from it.
Default behaviour is to start reading from latest message existing on partitio at the time consumer connected Kafka cluster. In our case we set it to earliest, because we want to start processing messages from the beginning of the partition.

After instantiating KafkaConsumer object we must tell it from which topics we want to read the messages.

And now we can start reading messages:

while (!shutdown.get()) {
                ConsumerRecords records = consumer.poll(1000);
                records.forEach(record -> processMessage(record));
            }

Here we have loop running until shutdown and in each iteration it reads messages using consumer.poll() method. It receives timeout argument, and it will block current thread until messages are available or timeout expires, whatever comes first. In both cases this method will return ConsumerRecords object which is collection of ConsumerRecord instances. Consumer record is key-value pair, representing message key and value and also containing some metadata of the message such as topic from which message was read, number of partition which hold this message and message offset (position) on it's partition.

Very important concept of consumer API are consumer groups. Consumer groups concept is Kafka's abstraction of queuing and publish-subscribe models. Two (or more) consumers from same group reading from same topic will never get same message (queuing model) but two (or more) consumers from different groups will (publish-subscribe model). This means that consumer group behaves like single logical consumer and specific consumers of the group (consumer nodes) are used for horizontal scaling of the logical consumer.

Since only one consumer from the group can read from  particular partition, in case when group has more consumers then topic has partitions, all extra consumers will be in kind of hot stand by - they will periodically ask Kafka for messages, but they will get empty record collection. If one of active consumers crashes, one of extra consumer nodes will start to receive messages exactly from the position crashed node stopped processing them. 

If there is single consumer node in a group reading from topic with two  or more partitions, this consumer will be getting messages from all partitions.

By guaranteeing that only one consumer at the time can read from the partition, Kafka guarantees not just message delivery order but message processing order.  Note that this guarantee applies for partition scope, not for entire topic  (with multiple partitions), but this should be enough since we can almost always group messages to partitions by some business property which is relevant for processing order. This is main purpose of message keys. Messages with same keys will end up on same partition , so if we use business property relevant for processing order as message key, we got things covered.

Yesterday Kafka 0.10 is released as part of Confluent platform 3.0  but since consumer API, which this post is focused on, had it major change in version 0.9 and not changed in 0.10, so everything said here still applies.

Latest version of Kafka platform brings interesting things such as  Streams library and Control Center and version 0.9  brought Security and Kafka Connect and I hope I will grab some time to blog about some of them soon. 

Monday, June 22, 2015

Apache Kafka - First Contact

Finally I caught some time to dig into Kafka, a high-throughput distributed messaging system, as authors define it.

Essentially, Kafka is publish-subscribe messaging system, it can be used for several use case scenarios but in this post we will focus on tradinional message broker use case.
Kafka stores every message on disk. Yet, it is very fast. If you are interested how is this achieved, read it later :)
It is distributed by design - it is run as a cluster comprised of one or more servers each of which is called a broker.

Important concepts

- Kafka stores messages in categories called topics.
- Topics consists of one or more "partitions"
- Each partition is an ordered, immutable sequence of messages that is continually appended to
- Processes that publish messages to Kafka are called producers
- Processes that subscribe to topics and fetch messages are called consumers
- Messages are not deleted after delivery, but after configured period of time.

Partitions are replicated across Kafka cluster. Partition copies are distributed among multiple brokers but only one broker is "partition leader", meaning that all writes and reads to that partition must be handled through that broker and other brokers are used for only redundancy. If partition leader broker crashes, Kafka will automatically elect another leader among brokers holding that partition copy.

Let the games begin!

Now hat we know basic stuff, lets play.
In this "how to" I am using Kafka 0.8.2.1 so first step is to download it from this link.

After download, unzip it, open terminal console and go to kafka directory you just extracted from downloaded archive.

First, we must run Apache ZooKeeper since Kafka uses it for managing cluster configuration information, bot no worries, it is included in Kafka distribution and you only need to run it :
bin/zookeeper-server-start.sh config/zookeeper.properties
Zookeeper should be started and produce some output with last line looking something like this:
INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

Next step, open another terminal (unless you started zookeeper in background), and let's run Kafka.
One node will be enough for this test:
bin/kafka-server-start.sh config/server.properties
Hopefully everything went fine, and we have Kafka server running.

Finally, open third terminal and create  topic for testing purposes:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic my-test-topic

We set replication-factor argument to 1, which means we don't have replication (we only have one broker currently).
Also, we set number of partitions to 2, which will be important later, when we will be talking about consumers.
As last option we specify topic name: "my-test-topic".
After we run this command, following output should appear:
Created topic "my-test-topic".

Now we have Kafka up and running! It is just one node (broker) but running more nodes is not complicated (step 6 in kafka quickstart) and it's not so important for the purpose of this post, which is showing how to produce and consume some messages from Java application.

Java time

We need following dependency for setting up our test application:
 <dependency>  
      <groupId>org.apache.kafka</groupId>  
      <artifactId>kafka_2.11</artifactId>  
      <version>0.8.2.1</version>  
 </dependency>  
You can get complete example code at my github repository.

Producer


When publishing message to Kafka cluster, producer must send it directly to partition leader, so it must know it's address. But have no fear, Producer API handles that part for you. All you have to do is provide addresses of one or more brokers (at least one but more is better for redundancy reasons) to producer configuration. Any broker can be queried for metadata containing information about topics and partitions, so API will use this list you provide to do so before really sending the message.
Let's  define producer properties:

Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new kafka.producer.ProducerConfig(props);

metadata.broker.list property,  addresses of one or more brokers for fetching metadata,  we have only one broker localhost:9092. For providing more broker addresses format is: host1:port1,host2:port2
serializer.class property, defines message serializer. We use built in StringEncoder class.
request.required.acks property, 1 means that the producer gets an acknowledgement after the leader replica has received the data, not waiting for other brokers to replicate the data. If we had more replicas we could ask for more then one acknowledgement, ensuring better durability guarantees.

Configuration is ready, let's create producer:
Producer p = new Producer(config);
and we can start sending messages:
KeyedMessage keyedMessage = new KeyedMessage(topic, key, message);
p.send(keyedMessage);

OK, that will work, but... what is this  argument named key ?
Remember when we created our topic in terminal console, we specified that we want 2 partitions for it. And now is time to explain why.

Our goal is to have more then one consumer to consume messages from this topic. In Kafka, only one consumer can read messages from specific partition, so number of active consumers per topic is limited by the number of partitions in that topic. Having two partitions allow us to have two simultaneously active consumers.

So far so good, we have two partitions, and now we want producer to distribute messages to both of them, right?
In order to do that, we must provide a message key (String) which will be used to determine target partition for each message. This is called semantic partitioning.
Default partition resolving strategy is implemented as hash(key) % numberOfPartitions , meaning same key will always resolve same partition number (partitions inside topic are numbered as 0,1...P)
KeyedMessage can be instantiated without key : new KeyedMessage<>(topic, message) in which case Kafka should choose random partition (according to  documentation), but in my tests all messages went to first partition.
In our producer example we will generate keys also using modulo by numberOfPartitionsin order to achieve "round-robin" message distribution.

Complete producer example:
package hr.ib.kafka.test;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class ProducerTest {


    public ProducerTest(String topic, int partitionCount, int msgProduceDelay) {

        Properties props = new Properties();

        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");

        ProducerConfig config = new kafka.producer.ProducerConfig(props);
        Producer p = new Producer(config);

        run(topic, partitionCount, msgProduceDelay, p);
    }


    private void run(String topic, int partitionCount, int msgProduceDelay, Producer p) {

        int i = 0;
        while(true) {
            String message = "Message #" + (i + 1);
            String key = "" + i % partitionCount;
            KeyedMessage keyedMessage = new KeyedMessage<>(topic, key, message);
            p.send(keyedMessage);
            System.out.println("Sent message :" + message);
            sleep(msgProduceDelay);
            i++;
        }
    }


    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        new ProducerTest("my-test-topic", 2, 500);
    }

}

Consumer


Before we dive into consumer code, we must know few things:

1. In Kafka, brokers do not push messages to consumers, consumers pull them.
2. Kafka has "Simple API" (low level) and "High level API" for consumers.
3. Consumer group defines set of consumers for handling specific business task. We can think of consumer group as logical subscriber for specific topic.
4. For each partition, Kafka tracks "consumer offset" for each consumer group - a number of last  message in partition consumed by that consumer group.

High level API handles offset tracking  automatically, but if you want to use some advanced features, like start processing messages from the beginning on consumer restart, you must use Simple API and handle offset tracking your self.
High level API is enough for most use cases, and in this example we will use it.

As we already mentioned, one partition can be consumed by only one consumer at the same time.
Kafka implements this behavior  using KafkaStream - only one stream per partition will be active in same time. If we try to open more streams then number of partitions in a topic, "extra streams" will be opened but no messages will be fed to them.

First we have to create consumer connector:
String zooKeeper = "localhost:2181";
String groupId = "myApp";
String topic = "my-test-topic"; 

Properties props = new Properties();
props.put("zookeeper.connect", zooKeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

ConsumerConfig consumerConfig = new ConsumerConfig(props);

consumerConnector =  Consumer.createJavaConsumerConnector(consumerConfig);

Consumer config is provided with zookeper location,  so it can get all Kafka cluster information it needs. And maybe most important parameter here - group.id, it is used to tell Kafka that this consumer is part of "myApp" consumer group.
We have established consumer connection, now we can get streams, and consume them. We could fetch multiple streams and handle them in separate threads, but we will rather have each stream consumed in separate JVM process, so we will request only one stream via this consumer connection, and start multiple consumer instances in order to handle more streams.
This is how we fetch consumer streams from consumer connection:
 

Map<String, Integer> topicCountMap = new HashMap<>();  
 topicCountMap.put(topic, new Integer(1));  
 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);  
   
   
 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  
   
 for (KafkaStream<byte[], byte[]> stream : streams) {  
   StreamConsumer streamConsumer = new StreamConsumer(stream);  
   new Thread(streamConsumer).start();  
 }  


topicCountMap is used to define which streams we want. We can ask for streams from multiple topics, and  specify how many stream for each topic we want. We just want one stream from our topic.
Stream consuming is pretty simple.
 
 ConsumerIterator<byte[], byte[]> it = stream.iterator();  
 while (it.hasNext()) {  
    System.out.println("Consumed message: " + new String(it.next().message()));  
 }  

And here is complete consumer example:
 
package hr.ib.kafka.test;  
   
   
 import kafka.consumer.Consumer;  
 import kafka.consumer.ConsumerConfig;  
 import kafka.consumer.ConsumerIterator;  
 import kafka.consumer.KafkaStream;  
 import kafka.javaapi.consumer.ConsumerConnector;  
   
 import java.util.HashMap;  
 import java.util.List;  
 import java.util.Map;  
 import java.util.Properties;  
   
 public class ConsumerTest {  
   
   
   String zooKeeper = "localhost:2181";  
   String groupId = "myApp";  
   String topic = "my-test-topic";  
   
   ConsumerConnector consumerConnector;  
   
   public ConsumerTest() {  
   
     Properties props = new Properties();  
     props.put("zookeeper.connect", zooKeeper);  
     props.put("group.id", groupId);  
     props.put("zookeeper.session.timeout.ms", "400");  
     props.put("zookeeper.sync.time.ms", "200");  
     props.put("auto.commit.interval.ms", "1000");  
   
     ConsumerConfig consumerConfig = new ConsumerConfig(props);  
   
     consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
   
     Map<String, Integer> topicCountMap = new HashMap<>();  
     topicCountMap.put(topic, new Integer(1));  
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);  
   
   
     List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  
   
     for (KafkaStream<byte[], byte[]> stream : streams) {  
       StreamConsumer streamConsumer = new StreamConsumer(stream);  
       new Thread(streamConsumer).start();  
     }  
   }  
   
   
   
   
   class StreamConsumer implements Runnable {  
   
     private KafkaStream stream;  
   
     public StreamConsumer(KafkaStream stream) {  
       this.stream = stream;  
     }  
   
     public void run() {  
       ConsumerIterator<byte[], byte[]> it = this.stream.iterator();  
       while (it.hasNext()) {  
         System.out.println("Consumed message: " + new String(it.next().message()));  
       }  
     }  
   }  
   
   
   
   public static void main(String[] args) {  
     new ConsumerTest();  
   }  
   
   
 }  

Now we can run one producer and  several consumers to see all this in action.

I suggest following steps:

  1. Run ProducerTest and leave it running.
  2. Run one ConsumerTest and observer output. You will notice it consumes all messages.
  3. Run second ConsumerTest and give it few seconds. Kafka will re-balance streams, and second consumer will start receiving messages (in my case even number messages), and first consumer will continue to consume only odd number messages.
  4. Start third ConsumerTest, and notice that it doesn't process any messages. It is because we only have two partitions, so third kafka stream will be empty. Leave it running anyway.
  5. Kill second consumer, and observe third one, it will soon start to process some messages. This is because Kafka re-balanced again after disconnection of second consumer and started to feed third consumer stream with messages.


And that's it.

I will close with notice that Kafka 0.9 will introduce new consumers API, so stay tuned for another "how to"  soon :)

Sunday, October 6, 2013

Apache Shiro with Spring framework, Java config and WebApplicationInitializer

Recently I was adding Apache Shiro security framework to Spring based web application which is using Java config and doesn't have xml configuration at all, not even web.xml

Apache Shiro documentation is mostly using xml examples so it took some time to put it all together in Java config based application.

Central part of Shiro security is  a realm. Here is how official Shiro documentation defines realms:

"A Realm is a component that can access application-specific security data such as users, roles, and permissions. The Realm translates this application-specific data into a format that Shiro understands so Shiro can in turn provide a single easy-to-understand Subject programming API no matter how many data sources exist or how application-specific your data might be."

Shiro comes with number of out-of-the-box Realm implementations that connects directly to database, to LDAP, etc, but in this example we will use custom Realm implementation since we want to access user data via our own user manager.

First, we have SecurityConfig. java where all security related beans are defined.
package com.xxx.yyy.config;

import com.xxx.yyy.security.CustomSecurityRealm;
import org.apache.shiro.spring.LifecycleBeanPostProcessor;
import org.apache.shiro.spring.security.interceptor.AuthorizationAttributeSourceAdvisor;
import org.apache.shiro.web.mgt.DefaultWebSecurityManager;
import org.apache.shiro.web.mgt.WebSecurityManager;
import org.springframework.aop.framework.autoproxy.DefaultAdvisorAutoProxyCreator;
import org.springframework.beans.factory.config.MethodInvokingFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;

@Configuration
public class SecurityConfig {

    @Bean
    public CustomSecurityRealm customSecurityRealm(){
        return new CustomSecurityRealm();
    }

    @Bean
    public WebSecurityManager securityManager(){
        DefaultWebSecurityManager securityManager = new DefaultWebSecurityManager();
        securityManager.setRealm(customSecurityRealm());
        return securityManager;
    }

    @Bean
    public LifecycleBeanPostProcessor lifecycleBeanPostProcessor(){
        return new LifecycleBeanPostProcessor();
    }

    @Bean
    public MethodInvokingFactoryBean methodInvokingFactoryBean(){
        MethodInvokingFactoryBean methodInvokingFactoryBean = new MethodInvokingFactoryBean();
        methodInvokingFactoryBean.setStaticMethod("org.apache.shiro.SecurityUtils.setSecurityManager");
        methodInvokingFactoryBean.setArguments(new Object[]{securityManager()});
        return methodInvokingFactoryBean;
    }

    @Bean
    @DependsOn(value="lifecycleBeanPostProcessor")
    public DefaultAdvisorAutoProxyCreator defaultAdvisorAutoProxyCreator(){
        return new DefaultAdvisorAutoProxyCreator();
    }

    @Bean
    public AuthorizationAttributeSourceAdvisor authorizationAttributeSourceAdvisor(){
        AuthorizationAttributeSourceAdvisor authorizationAttributeSourceAdvisor = new AuthorizationAttributeSourceAdvisor();
        authorizationAttributeSourceAdvisor.setSecurityManager(securityManager());
        return authorizationAttributeSourceAdvisor;
    }

}

First bean defined is our custom security realm implementation. We will take a look at it in a moment, but for now let just look where is it used. And we don't have to go far, it is used by shiro security manager defined as second bean in SecurityConfig.java. We use DefaultWebSecurityManager since we plan to use Shiro for securing our applications URLs.
We just create an instance and inject our custom securtity realm bean to it.

After that we have few Shiro beans and we just inject our security manager bean wherever required.

Let's look how our custom security realm implementation looks like.
package com.xxx.yyy.security;

import com.xxx.yyy.security.Role;
import com.xxx.yyy.security.Permission;
import com.xxx.yyy.User;
import com.xxx.yyy.UserManager;
import org.apache.shiro.authc.*;
import org.apache.shiro.authc.credential.SimpleCredentialsMatcher;
import org.apache.shiro.authz.AuthorizationInfo;
import org.apache.shiro.authz.SimpleAuthorizationInfo;
import org.apache.shiro.authz.permission.WildcardPermission;
import org.apache.shiro.realm.AuthorizingRealm;
import org.apache.shiro.subject.PrincipalCollection;
import org.springframework.beans.factory.annotation.Autowired;

public class CustomSecurityRealm extends AuthorizingRealm {

    @Autowired
    private UserManager userManager;

    @Override
    protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {

        Set roles = new HashSet<>();
        Set permissions = new HashSet<>();

        Collection principalsList = principals.byType(User.class);
        for (User user : principalsList) {
            for (Role role : user.getRoles()) {
                roles.add(role.getName());
                for (Iterator iterator = role.getPermissions().iterator(); iterator.hasNext(); ) {
                    Permission permission = iterator.next();
                    permissions.add(new WildcardPermission(permission.name()));
                }
            }
        }

        SimpleAuthorizationInfo info = new SimpleAuthorizationInfo(roles);
        info.setRoles(roles);
        info.setObjectPermissions(permissions);

        return info;
    }

     @Override
    protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken token) throws AuthenticationException {

        UsernamePasswordToken upat = (UsernamePasswordToken) token;
        User user = userManager.getByUsername(upat.getUsername());
        if(user != null && user.getPassword().equals(new String(upat.getPassword()))) {
            return new SimpleAuthenticationInfo(user, user.getPassword(), getName());
        }
        else {
            throw new AuthenticationException("Invalid username/password combination!");
        }
    }
}

Our security  realm implementation will be used both for authentication and authorization so we extend  AuthorizingRealm which extends AuthenticatingRealm.
Autowired UserManager is our application service for accessing users and their roles and permissions.

doGetAuthenticationInfo method is used to authenticate user, and it has one argument - AuthenticationToken which holds username and password entered by user in login form.
Inside this method we check if user for given username exists and if password matches the password enetered by user. If those conditions are satisfied, we return AuthenticationInfo object with our user object as principal. We use Shiro's SimpleAuthenticationInfo implementation of AuthenticationInfo interface.
If user doesn't exist or password doesn't match we throw Authentication exception.
This is very simple example, in real project we will probably use Shiro's HashedCredentialsMatcher for checking username/password combination since we probably want to use encoded passwords.

doGetAuthorizationInfo method is used by Shiro to get roles and permissions for specific principal(s) so it has PrincipalCollection as argument. For every principal in given collection (usually there will be only one) we will get roles and permissions and set them to AuthorizationInfo which will be returned by this method. We use Shiro's SimpleAuthorizationInfo implementation for this purpose. The code should be pretty self-explanatory.

Now that we have our basic security infrastructure defined, we need to integrate it with our web application.
In order to protect urls we need to add Shiro filter to our web app descriptor.  As I mentioned in the beginning of this post, we don't use web.xml but instead we have WebApplicationInitializer.
here is how it looks like:

package com.xxx.yyy;

import com.xxx.yyy.config.DataConfig;
import com.xxx.yyy.config.SecurityConfig;
import com.xxx.yyy.config.WebConfig;
import org.springframework.web.WebApplicationInitializer;
import org.springframework.web.context.ContextLoaderListener;
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;
import org.springframework.web.filter.DelegatingFilterProxy;
import org.springframework.web.servlet.DispatcherServlet;

public class WebInitializer implements WebApplicationInitializer {

    @Override
    public void onStartup(ServletContext container) {

        // Create the 'root' Spring application context
        AnnotationConfigWebApplicationContext rootContext = new AnnotationConfigWebApplicationContext();
        rootContext.register( DataConfig.class, SecurityConfig.class);

        // Manage the lifecycle of the root application context
        container.addListener(new ContextLoaderListener(rootContext));


        // Create the dispatcher servlet's Spring application context
        AnnotationConfigWebApplicationContext dispatcherContext = new AnnotationConfigWebApplicationContext();
        dispatcherContext.setServletContext(container);
        dispatcherContext.setParent(rootContext);
        dispatcherContext.register(WebConfig.class);


        // Register and map the dispatcher servlet
        ServletRegistration.Dynamic dispatcher = container.addServlet("dispatcher", new DispatcherServlet(dispatcherContext));
        dispatcher.setLoadOnStartup(1);
        dispatcher.addMapping("/");


        container.addFilter("shiroFilter", new DelegatingFilterProxy("shiroFilterBean", dispatcherContext))
                           .addMappingForUrlPatterns(null, false, "/*");

    }
}
The code speaks for itself. We have root context with DataConfig (which contains JPA configuration, but this is not relevant for this story) and our SecurityConfig explained earlier.

In order to configure our Spring MVC  we added dispatcher context,and registered WebConfig class which contains required beans.

Last bean is most relevant since it defines Shiro filter which is configured to intercept all URLs.
We use DelegatingFilterProxy as filter implementation, and we provide "shiroFilterBean" for bean name.
This bean is defined in our WebConfig class, so let's take a look at it:
package com.xxx.yyy;

import org.apache.shiro.spring.web.ShiroFilterFactoryBean;
import org.apache.shiro.web.mgt.WebSecurityManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.MessageSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.ReloadableResourceBundleMessageSource;
import org.springframework.web.servlet.LocaleResolver;
import org.springframework.web.servlet.config.annotation.*;
import org.springframework.web.servlet.i18n.CookieLocaleResolver;

@Configuration
@EnableWebMvc
@ComponentScan(basePackages = {"com.xxx.yyy.web"})
public class WebConfig extends WebMvcConfigurerAdapter {


    @Autowired
    private WebSecurityManager securityManager;


    @Bean
    public VelocityConfigurer velocityConfig() {
        VelocityConfigurer configurer = new VelocityConfigurer();
        configurer.setResourceLoaderPath("/WEB-INF/templates");
        Properties props = new Properties();
        props.put("output.encoding", "UTF-8");
        props.put("input.encoding", "UTF-8");
        configurer.setVelocityProperties(props);
        return configurer;
    }

    @Bean
    public VelocityViewResolver viewResolver() {
        VelocityViewResolver resolver = new VelocityLayoutViewResolver();
        resolver.setExposeSpringMacroHelpers(true);
        resolver.setContentType("text/html;charset=UTF-8");
        resolver.setSuffix(".vm");
        return resolver;
    }

    @Bean
    public MessageSource messageSource() {
        ReloadableResourceBundleMessageSource messageSource = new ReloadableResourceBundleMessageSource();
        messageSource.setBasenames("/WEB-INF/localization/messages");
        messageSource.setDefaultEncoding("UTF-8");
        messageSource.setCacheSeconds(10);
        return messageSource;
    }

    @Bean
    public LocaleResolver localeResolver() {
        CookieLocaleResolver localeResolver = new CookieLocaleResolver();
        localeResolver.setCookieName("LOCALE");
        return localeResolver;
    }


    @Bean
    public ShiroFilterFactoryBean shiroFilterBean(){
        ShiroFilterFactoryBean shiroFilter = new ShiroFilterFactoryBean();
        Map<String, String> definitionsMap = new HashMap<>();
        definitionsMap.put("/login.jsp", "authc");
        definitionsMap.put("/admin/**", "authc, roles[admin]");
        definitionsMap.put("/**", "authc");
        shiroFilter.setFilterChainDefinitionMap(definitionsMap);
        shiroFilter.setLoginUrl("/login.jsp");
        shiroFilter.setSecurityManager(securityManager);
        return shiroFilter;
    }
}


This is typical java config based Spring MVC configuration.
Beside usual Spring MVC beans, we added ShiroFilterFactoryBean (at the end) which will be referenced  from our WebApplicationInitializer , remember ?
ShiroFilterFactoryBean requires WebSecurityManager and since we defined it in SecurityConfig.java , all we need to do here is to autowire it to private field (line 12) and inject it to shiroFilterBean (line 60)

In between we just have velocity template engine configuration beans as well as localization beans which are not relevant for security framework.

Now, when user attempts to  access any application URL, Shiro filter will intercept it, and delegate security checking to shiroFilterBean which will use securityManager bean to determine if the user has right to access this specific URL.
If user is not authenticated yet, Shiro filter will redirect user to login page. Here is simple example of login.jsp:

<form action="" method="post" name="loginform">
  <table align="left" border="0" cellpadding="3" cellspacing="0">
    <tr>
 <td>Username:</td>
 <td><input maxlength="30" name="username" type="text" /></td>
    </tr>
    <tr>
 <td>Password:</td>
 <td><input maxlength="30" name="password" type="password" /></td>
    </tr>
    <tr>
 <td align="left" colspan="2">
   <input name="rememberMe" type="checkbox" /><span style="font-size: x-small;">Remember Me</span>
 </td>
    </tr>
    <tr>
 <td align="right" colspan="2"><input name="submit" type="submit" value="Login" /></td>
    </tr>
  </table>
</form>


And that's it... we have basic Shiro security setup. The way how application persists users, their roles and permission is application specific. All that Shiro has to know about this will get it from our security realm implementation.


UPDATE: Added /login.jsp to  ShiroFilterFactoryBean definitionsMap.  Without it Shiro will not handle login form submit correctly.  Although it looks like we are restricting anonymous access to it, Shiro will know that it should allow it.


UPDATE 2: Added imports to code snippets .

Sunday, September 15, 2013

Building native android app with Sencha Cmd on linux

Recently I started to work on project with  Sencha Touch framework (client requirement) and my first task  was to build simple screen with some GUI components and make it run on android as native app.

I was required to use Sencha Architect for development and app building but since I use Linux  and Sencha Architect supports app packaging only on windows and Mac, I had to use Sencha Cmd utility for building native app.

Before I go any further let me throw some info on my development environment:

OS: Ubuntu 13.04
Java 1.7
Ruby 1.9.3
Sencha Arcitect 2.2.2 build 991
Sencha Cmd v3.1.3.372


After reading quick start docs it all looked easy, but as it often happens when using new tools, I quickly run into some frustrating problems.

I made simple app, tested it in browser, and I thought I was ready to package it for android. Before building native app, I configured packager file (which contains mobile app configuration, such as app name, package id, android API level, android sdk path etc...)

Fired up terminal, navigated to project folder, and as Sencha documantation instructs, I run build command : sencha app build native

And... after few seconds my apk is generated... NOOOT!



First bump, tool complained that it misses nimblekit.jar.

I haven't found anything about this in official Sencha documentation, but some people out there on the net  experienced same problem and found the solution: copy st-res folder from $sencha_cmd_install/stbuild to your project folder.
I am not sure why Sencha Cmd doesn't know where to find st-res folder, documentation doesn't says anything about it (AFAIK). Same problem happens on windows. However, copying st-res folder solves the problem both on linux and windows.




OK, problem solved, let's continue...

sencha app build native ...  and... after few seconds my apk is generated...  NOOOT!

Second problem caused me saying "WTF" quite a few times.

Command runs for a long time (minute or two) and then reports "Zip add failed" error on Icon.png file.
Since I haven't change icon configuration in packager component, I expected default icons to be used.
After unsuccessful googling for the problem and running the command few times, I noticed that Icon.png file in build/native/res/drawable-mdpi grows up to 2 G !!! Same goes for drawable-mdpi.
But in drawable-hdpi , Icon size is normal, about 3.5 K.  I checked packager icon config and realized that default icon config doesn't define low dpi and medium dpi at all.
On windows this problem doesn't appear event though icon config is same.
Obviously, there is some bug on linux version of Sencha Cmd that causes the problem.
I managed to solve the problem by defining low and medium dpi icons (36 and 48 px).


Since I 'm pretty new to Sencha, it's probably to early to judge, but my first impression is not very positive.
Besides the problems I described what's bothering me  the most is that Sencha Architect doesn't have code completion.

Feel free to throw a comment, especially if you had similar experiences with Sencha.

Sunday, August 18, 2013

Developing 3d game for Android - part 2

Well, it is done!

I finished version 1.0 of my 3D space arcade game based on  Rajawali framework and published it on  Google Play.

You can check it out here: https://play.google.com/store/apps/details?id=com.mabuga.gravity.android.before4&hl=en

And here is gameplay video on youtube:





I enjoyed developing with Rajawali, it was very fun and the issues I run into were not critical.
Issue that I spent most time dealing with is rendering explosions. I wanted to use simple animated sprites for explosions and Rajawali supports that via Particle primitive. Basically, you set spritesheet texture to a Particle, define how many tiles spritesheet has, and Rajawali shader will render the particle using given spritesheet creating animation from it's tiles.

What I wanted is just one big animated sprite to render explosion. And that was where the problem appears - the size of single particle.  Actually there are two problems related to particle size.

1. There is a bug in Particle code that is causing particle look smaller as as the distance from world origin gets bigger, regardless of camera position.

2. Particle is rendered as textured point and unfortunately OpenGL implementations limit the size of the rendered point. Even worse, different implementation has different values for point size limit. OpenGL specification doesn't guarantee point size bigger then 1 px.

My workaround for first problem was to update particle code (GL shader and java) to fit my needs. Unfortunately, my solution did not solve the bug in generic way (since the problem is not just bug in calculation but conceptual as well) so I could not commit it back to the project.

Second problem actually can not be really solved since it it hardware specific, so I just accepted imposed point size limit. Quick test on few devices I own showed me that point size limit is around 100 px on my HTC Explorer with 480x320 px screen and around 500 px on my Samsung Galaxy Tab 2 with 1280x800 px screen. That was ok for my explosions, however, there are probably devices with big screen and small point size limit and my explosions probably look bad on those.

Also, I experienced some weird rendering issues when returning from RajawaliActivity with RajawaliRendered to normal Activity, but that was solved by setting android:hardwareAccellerated to false on activity elements in android manifest.

So, there were actually very few problems with Rajawali framework.
Besides the issues, I would like to share some thoughts about challenges of optimizing the game for low CPU powered devices.

Crucial part of almost any game is collision detection. Rajawali already has built in collision detection features, but checking collisions too often can have significant CPU cost causing glitches in gameplay.
In every frame (about 30 times per second) we have to check collision of every moving object with all other objects on the scene. Let's say that we have 30 moving objects (ships + bullets) and about 20 - 30 static objects (meteors) on the scene at the same time, this means we have to handle ~ 30 * 50 = 1500 checks per frame which makes 45 000 per second. And that's a lot.
In order to minimize number of collision checks per frame, I divided game space into "sectors". For static objects I only calculate in which sector they are in game init phase, and for moving objects I had to calculate in which sector they are every time they move.
Also, I added global sector map that maps sector name to a list of objects currently in that sector. This map is also updated on every object sector calculation.
So, now for every moving object we only need to check collisions with objects currently in same sector , and this is usually 0-5 objects. This reduces CPU usage big time. This way number of collision checks doesn't increase at all if we add more static (non moving) objects.

Another thing I optimized in order to achieve better game performance is to handle object creation little bit smarter then just create them when ever I need them. Actually, I recycle them.
Whenever new bullet is fired or new ship should appear, it should be added to scene, and whenever ship is destroyed or bullet hits something, it should be removed from the scene.  But adding and removing new objects can also be costly, especially if it happens often. If we instantiate new bullet every time ship fires, we spend CPU cycles on initiation of the bullet. After that we have to add it to a scene (renderer), and this also takes some CPU time since renderer must register new object to it's list of objects in synchronized way. Rajawally uses synchronized "copy on write" list for this purpose.

So, instead of creating and destroying new objects all the time, I used well known technique called "object pooling" . It boils down to creating certain number of objects and store them in a pool (a list or a map) , then take it from that pool when we need new object and return it when the object should be destroyed/removed.
In my case, I had two different pools, one for bullets, another for explosions. Using two pools for specific object types instead using one generic pool also makes things faster since we don't have to do any type checking.
If new object is required and the pool is empty, we simple create new object and add it to the pool, so our pool will grow depending on demands.

When object need to be removed from the scene, I don't actually remove it, I just hide it by setting visibility to false and by setting its position far out of the camera view. I also remove it from my internal list of movable objects, so I don't move it or check collisions for it while it is in the pool (kind of object passivation).

When taken from the pool, object is set to visible, its position/rotation is updated as needed, and it is added to movable objects list, so it can be managed by game logic (moving and collision checking).
And that's it. Simple, but effective.

OpenGL handles rendering in most optimized way it can, and besides the few tricks described above there are several other things developer can do to make the game run smooth:
- use low poly 3D models
- keep number of objects on the scene as low as you can
- keep objects static (non movable) whenever possible

I covered some basic and common things related to game optimization, but every game is specific and it has specific things that can be optimized.

Friday, July 5, 2013

Developing 3d game for Android

To develop real 3D game is not trivial task no matter what platform you target.

Five or six  years ago I was experimenting with JOGL - Java Binding for the OpenGL API trying to develop 3D game in java for PC.
In that time it was common opinion that 3D games can only be devleoped with C++ since java is too slow. Even then that was not true. JOGL delegates all rendering to OpenGL , and in initialization phase you can create complex objects in OpenGL memory so later you can reference it from java and give short instructions to OpenGL in order to move or rotate that object. That means that java only have to manage positions and rotations of the objects, check collisions and of course game logic.

Since rendering is most demanding part of 3D game and it is handled by OpenGL ,  C++ being faster then java doesn't matter so much. Of course , in Java you will probably have to optimize more, but it is definitely fast enough.

I was very happy how my experiment turned out. Although I never finished and published the game, I managed to make nice little space combat with real LAN multiplayer so players could fight in space. It had laser guns, missiles that follow the targets, radar, etc. It was running smooth both on Windows and Linux.

Today, even cheap smartphones (like my HTC Explorer) have similar CPU power and memory my PC had at time I was playing with JOGL.

Smartphones support OpenGLES - The Standard for Embedded Accelerated 3D Graphics. Most phones support OpenGL ES 2.0, only older models still use OpenGL ES 1.x.

Working with JOGL, developer actually has to know much about OpenGL since JOGL is not a 3D engine, it is just a bridge between Java and OpenGL which allows us to call OpenGL commands from Java code.

In recent years several good 3D libraries emegred, making 3D apps development much easier. Since they manage low level OpenGL calls, you can focus more on your game logic.

There is a great OpenGL ES 2.0 based 3D engine for Android platform called Rajawali, developed by Dennis Ippel (great job Dennis). There is also Android Demo app that demonstrate Rajawali engine features.

Basic feature that every 3D engine provides is loading external 3D model, adding it to 3D scene and manipulating it's position and rotation. Most 3D engines provide much more features including collision checking, physics, lighting, various visual effects, etc.

Rajawali's set of features amazed me so I decided to try it out by  making simple 3D game that focuses on playability  rather then on visual attraction. As a kid a loved games like Galaga and Phoenix because they were dynamic and addictive. My goal is to achieve similar gameplay dynamic in 3D world.  Although I limited ships freedom to move to a plane (can't go up or down), 3D perspective gives me more "space" (player can see distant enemy ships much earlier that it would be possible if the view was from above) and of course 3D models of ships and meteors look more realistic then it can be achieved in 2D.

Here are first screenshots of the game :




So far everything works fine on HTC Explorer (Android 2.3.5) and HTC Desire C (Android 4.0). 
I'll keep you posted about my progress here.

Sunday, June 23, 2013

Developing android apps with Kotlin

For those who haven't heard yet, Kotlin is new JVM language developed by JetBrains, creators of IntelliJ IDEA, which many people consider the best java IDE.

Kotlin is still in development, beta should be released in following months, according to Kotlin forum, but it is already stable enough to do some real work in it.

Being java programmer for a long time, I always liked it for simplicity of the language itself, stability, large ecosystem of frameworks and libraries, cross-platform support etc...

But sometimes I missed few things that other languages have and Java doesn't.  Mostly things that would boost my productivity and make me write less code that is still elegant and readable.

One of those things is passing a function as an argument to another function (feature known as higher order functions or closures).  Writing listeners every time you need to use some kind of callback function can be really tedious. If you can pass callback function as an argument to another function/method it can really save time and lines of code. Javascript and Actionscript, languages that I used for some client side development, and that I considered inferior to java in many ways , have this for a long time.
Kotlin, as almost every other modern language has this feature.

Another thing that can save code and time is having default values of method arguments, so passing those arguments can be omitted in method call. This reduces, if not removes, the need for method overloading since it is more flexible way of having different combinations of arguments. 
I liked this feature in ActionScript , but Kotlin goes one step further: it supports default argument values, but it also supports passing arguments by name. This means, for example,  that you can pass only first and fourth argument, skipping second and third if they have default values. In action script if you want to pass forth argument you also need to pass all arguments before it.

There are lots of other features that make Kotlin code shorter and more elegant then Java code. 

Elegance and ease of use is not  Kotlin's only advantage over Java,   there are also features that make Kotlin more powerful, although with greater power comes more responsibility so in general it requires programmer to be more careful. 
To mention just few of them: null-safety, immutability, extension functions, lambdas, inline functions, operator overloading, range expressions... etc.
To see full list of Kotlin features check out docs page and  don't miss comparison to Java.

Many modern languages have very similar feature list like Kotlin: Java 8, Scala, Ceylon, Clojure (clojure is little bit different animal since it's Lisp dialect). So why to choose Kotlin?

For Java programmer Kotlin should be the easiest to switch to (except of course Java 8), since one of main goals of Kotlin is to be Java-compatible. This means that Kotlin should provide much better interoperability with Java then Scala, Clojure, Ceylon... 
Also, Kotlin compiles as fast as java , that is another goal of Kotlin authors.

Scala is Kotlin's main competitor so it is good to know what are similarities and differences between them.

Since Android apps run on Java and Kotlin is highly interoperable with Java it is possible to write android apps using Kotlin.

Here are some useful links on how to start with it:



For now, using IntelliJ IDEA is the easiest way to develop with Kotlin. 

I look forward to make myself more familiar with Kotlin using it on my new android app and I will share my experience here soon.