1
2 package org.galagosearch.tupleflow;
3
4 import java.io.IOException;
5 import java.lang.reflect.Constructor;
6 import java.lang.reflect.Field;
7 import java.lang.reflect.InvocationTargetException;
8 import java.lang.reflect.Method;
9
10 /***
11 *
12 * @author trevor
13 */
14 public class Linkage {
15 public static void link(Step source, Step stage, String fieldName) throws IncompatibleProcessorException {
16 try {
17 Class sourceClass = source.getClass();
18 Field processorField = sourceClass.getField(fieldName);
19 Class fieldClass = processorField.getType();
20
21
22 if (fieldClass.isInstance(stage)) {
23 processorField.set(source, stage);
24 return;
25 }
26
27
28 Class stageClass = stage.getClass();
29
30 for (Class sourceInterfaceClass : sourceClass.getInterfaces()) {
31 String interfaceName = sourceInterfaceClass.getName();
32 String typeName;
33 Object adapted = null;
34
35 try {
36 if ((typeName = Utility.strip(interfaceName, "$Source")) != null) {
37
38 done:
39 for (Class stageInterfaceClass : stageClass.getInterfaces()) {
40 String stageInterfaceName = stageInterfaceClass.getName();
41
42 if (stageInterfaceName.startsWith(typeName) &&
43 stageInterfaceName.endsWith("$ShreddedProcessor")) {
44 String stageOrderName = Utility.strip(stageInterfaceName,
45 "$ShreddedProcessor");
46 Constructor[] constructors = Class.forName(
47 stageOrderName + "$TupleShredder").getConstructors();
48
49 for (Constructor c : constructors) {
50 Class[] types = c.getParameterTypes();
51
52 if (types.length == 1 && types[0].isInstance(stage)) {
53 adapted = c.newInstance(stage);
54 break done;
55 }
56 }
57 }
58 }
59 } else if ((typeName = Utility.strip(interfaceName, "$ShreddedSource")) != null) {
60 Constructor[] constructors = Class.forName(typeName + "$TupleUnshredder").
61 getConstructors();
62
63 for (Constructor c : constructors) {
64 Class[] types = c.getParameterTypes();
65
66 if (types.length == 1 && types[0].isInstance(stage)) {
67 adapted = c.newInstance(stage);
68 break;
69 }
70 }
71 }
72 } catch (Exception e) {
73
74 continue;
75 }
76
77 if (adapted != null && fieldClass.isInstance(adapted)) {
78 processorField.set(source, adapted);
79 return;
80 }
81 }
82
83 throw new IncompatibleProcessorException("Stage of type '" + stage.getClass().getName() + "' cannot process the objects that '" + sourceClass.
84 getName() + "' produces.");
85 } catch (NoSuchFieldException e) {
86 throw new IncompatibleProcessorException(
87 "Stage of type '" + source.getClass().getName() + "' has no field called '" + fieldName + "' that can hold a processor object (or maybe it's just not public).");
88 } catch (IllegalAccessException e) {
89 throw new IncompatibleProcessorException(
90 "Stage of type '" + source.getClass().getName() + "' has a field called '" + fieldName + "', but " +
91 "it is private or protected and can't be modified by link().");
92 }
93 }
94
95 public static void link(Step source, Step stage) throws IncompatibleProcessorException {
96 link(source, stage, "processor");
97 }
98
99 public static void close(Step stage) throws IOException {
100 Method close;
101
102 try {
103 close = stage.getClass().getMethod("close");
104 } catch (NoSuchMethodException e) {
105 return;
106 }
107
108 try {
109 close.invoke(stage);
110 } catch (IllegalAccessException e) {
111 throw (IOException) new IOException(
112 "Couldn't access close method (should declare it public)").initCause(e);
113 } catch (InvocationTargetException e) {
114 throw (IOException) new IOException("Problem when calling close method").initCause(e);
115 }
116 }
117 }