View Javadoc

1   // BSD License (http://www.galagosearch.org/license)
2   package org.galagosearch.tupleflow;
3   
4   import java.io.IOException;
5   import java.lang.reflect.Constructor;
6   import java.lang.reflect.Field;
7   import java.lang.reflect.InvocationTargetException;
8   import java.lang.reflect.Method;
9   
10  /***
11   *
12   * @author trevor
13   */
14  public class Linkage {
15      public static void link(Step source, Step stage, String fieldName) throws IncompatibleProcessorException {
16          try {
17              Class sourceClass = source.getClass();
18              Field processorField = sourceClass.getField(fieldName);
19              Class fieldClass = processorField.getType();
20  
21              // are they directly compatible?  if so, finish quickly
22              if (fieldClass.isInstance(stage)) {
23                  processorField.set(source, stage);
24                  return;
25              }
26  
27              // try to determine the kind of thing that the stage is
28              Class stageClass = stage.getClass();
29  
30              for (Class sourceInterfaceClass : sourceClass.getInterfaces()) {
31                  String interfaceName = sourceInterfaceClass.getName();
32                  String typeName;
33                  Object adapted = null;
34  
35                  try {
36                      if ((typeName = Utility.strip(interfaceName, "$Source")) != null) {
37                          // does the stage object have a ShreddedProcessor interface?
38                          done:
39                          for (Class stageInterfaceClass : stageClass.getInterfaces()) {
40                              String stageInterfaceName = stageInterfaceClass.getName();
41  
42                              if (stageInterfaceName.startsWith(typeName) &&
43                                      stageInterfaceName.endsWith("$ShreddedProcessor")) {
44                                  String stageOrderName = Utility.strip(stageInterfaceName,
45                                                                        "$ShreddedProcessor");
46                                  Constructor[] constructors = Class.forName(
47                                          stageOrderName + "$TupleShredder").getConstructors();
48  
49                                  for (Constructor c : constructors) {
50                                      Class[] types = c.getParameterTypes();
51  
52                                      if (types.length == 1 && types[0].isInstance(stage)) {
53                                          adapted = c.newInstance(stage);
54                                          break done;
55                                      }
56                                  }
57                              }
58                          }
59                      } else if ((typeName = Utility.strip(interfaceName, "$ShreddedSource")) != null) {
60                          Constructor[] constructors = Class.forName(typeName + "$TupleUnshredder").
61                                  getConstructors();
62  
63                          for (Constructor c : constructors) {
64                              Class[] types = c.getParameterTypes();
65  
66                              if (types.length == 1 && types[0].isInstance(stage)) {
67                                  adapted = c.newInstance(stage);
68                                  break;
69                              }
70                          }
71                      }
72                  } catch (Exception e) {
73                      // we're just checking to see if anything works, so exceptions here don't mean anything to us
74                      continue;
75                  }
76  
77                  if (adapted != null && fieldClass.isInstance(adapted)) {
78                      processorField.set(source, adapted);
79                      return;
80                  }
81              }
82  
83              throw new IncompatibleProcessorException("Stage of type '" + stage.getClass().getName() + "' cannot process the objects that '" + sourceClass.
84                                                       getName() + "' produces.");
85          } catch (NoSuchFieldException e) {
86              throw new IncompatibleProcessorException(
87                      "Stage of type '" + source.getClass().getName() + "' has no field called '" + fieldName + "' that can hold a processor object (or maybe it's just not public).");
88          } catch (IllegalAccessException e) {
89              throw new IncompatibleProcessorException(
90                      "Stage of type '" + source.getClass().getName() + "' has a field called '" + fieldName + "', but " +
91                      "it is private or protected and can't be modified by link().");
92          }
93      }
94  
95      public static void link(Step source, Step stage) throws IncompatibleProcessorException {
96          link(source, stage, "processor");
97      }
98  
99      public static void close(Step stage) throws IOException {
100         Method close;
101 
102         try {
103             close = stage.getClass().getMethod("close");
104         } catch (NoSuchMethodException e) {
105             return;
106         }
107 
108         try {
109             close.invoke(stage);
110         } catch (IllegalAccessException e) {
111             throw (IOException) new IOException(
112                     "Couldn't access close method (should declare it public)").initCause(e);
113         } catch (InvocationTargetException e) {
114             throw (IOException) new IOException("Problem when calling close method").initCause(e);
115         }
116     }
117 }